mirror of https://github.com/apache/kafka.git
KAFKA-19244: Add support for kafka-streams-groups.sh options (delete all groups) [2/N] (#19758)
This PR implements all the options for `--delete --group grpId` and `--delete --all-groups` Tests: Integration tests and unit tests. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
2919478d00
commit
2a2626b3d8
|
@ -17,7 +17,10 @@
|
||||||
package org.apache.kafka.tools.streams;
|
package org.apache.kafka.tools.streams;
|
||||||
|
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.admin.AbstractOptions;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
|
||||||
|
import org.apache.kafka.clients.admin.DeleteTopicsResult;
|
||||||
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
|
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
|
||||||
import org.apache.kafka.clients.admin.GroupListing;
|
import org.apache.kafka.clients.admin.GroupListing;
|
||||||
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
|
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
|
||||||
|
@ -32,7 +35,10 @@ import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
import org.apache.kafka.common.GroupState;
|
import org.apache.kafka.common.GroupState;
|
||||||
import org.apache.kafka.common.GroupType;
|
import org.apache.kafka.common.GroupType;
|
||||||
|
import org.apache.kafka.common.KafkaFuture;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
||||||
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.server.util.CommandLineUtils;
|
import org.apache.kafka.server.util.CommandLineUtils;
|
||||||
|
|
||||||
|
@ -42,6 +48,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -61,9 +68,9 @@ public class StreamsGroupCommand {
|
||||||
opts.checkArgs();
|
opts.checkArgs();
|
||||||
|
|
||||||
// should have exactly one action
|
// should have exactly one action
|
||||||
long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt).filter(opts.options::has).count();
|
long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt, opts.deleteOpt).filter(opts.options::has).count();
|
||||||
if (numberOfActions != 1)
|
if (numberOfActions != 1)
|
||||||
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, or --describe.");
|
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, or --delete.");
|
||||||
|
|
||||||
run(opts);
|
run(opts);
|
||||||
} catch (OptionException e) {
|
} catch (OptionException e) {
|
||||||
|
@ -77,6 +84,8 @@ public class StreamsGroupCommand {
|
||||||
streamsGroupService.listGroups();
|
streamsGroupService.listGroups();
|
||||||
} else if (opts.options.has(opts.describeOpt)) {
|
} else if (opts.options.has(opts.describeOpt)) {
|
||||||
streamsGroupService.describeGroups();
|
streamsGroupService.describeGroups();
|
||||||
|
} else if (opts.options.has(opts.deleteOpt)) {
|
||||||
|
streamsGroupService.deleteGroups();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unknown action!");
|
throw new IllegalArgumentException("Unknown action!");
|
||||||
}
|
}
|
||||||
|
@ -396,6 +405,203 @@ public class StreamsGroupCommand {
|
||||||
props.putAll(configOverrides);
|
props.putAll(configOverrides);
|
||||||
return Admin.create(props);
|
return Admin.create(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<String, Throwable> deleteGroups() {
|
||||||
|
List<String> groupIds = opts.options.has(opts.allGroupsOpt)
|
||||||
|
? new ArrayList<>(listStreamsGroups())
|
||||||
|
: new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
|
||||||
|
|
||||||
|
// pre admin call checks
|
||||||
|
Map<String, Throwable> failed = preAdminCallChecks(groupIds);
|
||||||
|
|
||||||
|
groupIds.removeAll(failed.keySet());
|
||||||
|
Map<String, Throwable> success = new HashMap<>();
|
||||||
|
Map<String, List<String>> internalTopics = new HashMap<>();
|
||||||
|
Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>();
|
||||||
|
if (!groupIds.isEmpty()) {
|
||||||
|
// retrieve internal topics before deleting groups
|
||||||
|
internalTopics = retrieveInternalTopics(groupIds);
|
||||||
|
|
||||||
|
// delete streams groups
|
||||||
|
Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteStreamsGroups(
|
||||||
|
groupIds,
|
||||||
|
withTimeoutMs(new DeleteStreamsGroupsOptions())
|
||||||
|
).deletedGroups();
|
||||||
|
|
||||||
|
groupsToDelete.forEach((g, f) -> {
|
||||||
|
try {
|
||||||
|
f.get();
|
||||||
|
success.put(g, null);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
failed.put(g, ie);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
failed.put(g, e.getCause());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// delete internal topics
|
||||||
|
if (!success.isEmpty()) {
|
||||||
|
for (String groupId : success.keySet()) {
|
||||||
|
List<String> internalTopicsToDelete = internalTopics.get(groupId);
|
||||||
|
if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) {
|
||||||
|
DeleteTopicsResult deleteTopicsResult = null;
|
||||||
|
try {
|
||||||
|
deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete);
|
||||||
|
deleteTopicsResult.all().get();
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
if (deleteTopicsResult != null) {
|
||||||
|
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (Exception topicException) {
|
||||||
|
System.out.println("Failed to delete internal topic: " + topic);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
internalTopicsDeletionFailures.put(groupId, e.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// display outcome messages based on the results
|
||||||
|
if (failed.isEmpty()) {
|
||||||
|
System.out.println("Deletion of requested streams groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "') was successful.");
|
||||||
|
} else {
|
||||||
|
printError("Deletion of some streams groups failed:", Optional.empty());
|
||||||
|
failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error));
|
||||||
|
|
||||||
|
if (!success.isEmpty()) {
|
||||||
|
System.out.println("\nThese streams groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "'.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!internalTopics.keySet().isEmpty()) {
|
||||||
|
printInternalTopicErrors(internalTopicsDeletionFailures, success.keySet(), internalTopics.keySet());
|
||||||
|
}
|
||||||
|
// for testing purpose: return all failures, including internal topics deletion failures
|
||||||
|
failed.putAll(success);
|
||||||
|
failed.putAll(internalTopicsDeletionFailures);
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Throwable> preAdminCallChecks(List<String> groupIds) {
|
||||||
|
List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
|
||||||
|
LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
|
||||||
|
|
||||||
|
Map<String, Throwable> failed = new HashMap<>();
|
||||||
|
|
||||||
|
for (String groupId : groupIdSet) {
|
||||||
|
Optional<GroupListing> listing = streamsGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny();
|
||||||
|
if (listing.isEmpty()) {
|
||||||
|
failed.put(groupId, new IllegalArgumentException("Group '" + groupId + "' does not exist or is not a streams group."));
|
||||||
|
} else {
|
||||||
|
Optional<GroupState> groupState = listing.get().groupState();
|
||||||
|
groupState.ifPresent(state -> {
|
||||||
|
if (state == GroupState.DEAD) {
|
||||||
|
failed.put(groupId, new IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
|
||||||
|
} else if (state != GroupState.EMPTY) {
|
||||||
|
failed.put(groupId, new GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<GroupListing> listDetailedStreamsGroups() {
|
||||||
|
try {
|
||||||
|
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
|
||||||
|
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
|
||||||
|
.withTypes(Set.of(GroupType.STREAMS)));
|
||||||
|
Collection<GroupListing> listings = result.all().get();
|
||||||
|
return listings.stream().toList();
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printInternalTopicErrors(Map<String, Throwable> internalTopicsDeletionFailures,
|
||||||
|
Set<String> deletedGroupIds,
|
||||||
|
Set<String> groupIdsWithInternalTopics) {
|
||||||
|
if (!deletedGroupIds.isEmpty()) {
|
||||||
|
if (internalTopicsDeletionFailures.isEmpty()) {
|
||||||
|
List<String> successfulGroups = deletedGroupIds.stream()
|
||||||
|
.filter(groupIdsWithInternalTopics::contains)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
System.out.println("Deletion of associated internal topics of the streams groups ('" +
|
||||||
|
String.join("', '", successfulGroups) + "') was successful.");
|
||||||
|
} else {
|
||||||
|
System.out.println("Deletion of some associated internal topics failed:");
|
||||||
|
internalTopicsDeletionFailures.forEach((group, error) ->
|
||||||
|
System.out.println("* Internal topics of the streams group '" + group + "' could not be deleted due to: " + error));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Visibility for testing
|
||||||
|
Map<String, List<String>> retrieveInternalTopics(List<String> groupIds) {
|
||||||
|
Map<String, List<String>> groupToInternalTopics = new HashMap<>();
|
||||||
|
try {
|
||||||
|
Map<String, StreamsGroupDescription> descriptionMap = adminClient.describeStreamsGroups(groupIds).all().get();
|
||||||
|
for (StreamsGroupDescription description : descriptionMap.values()) {
|
||||||
|
|
||||||
|
List<String> sourceTopics = description.subtopologies().stream()
|
||||||
|
.flatMap(subtopology -> subtopology.sourceTopics().stream()).toList();
|
||||||
|
|
||||||
|
List<String> internalTopics = description.subtopologies().stream()
|
||||||
|
.flatMap(subtopology -> Stream.concat(
|
||||||
|
subtopology.repartitionSourceTopics().keySet().stream(),
|
||||||
|
subtopology.stateChangelogTopics().keySet().stream()))
|
||||||
|
.filter(topic -> !sourceTopics.contains(topic))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
internalTopics.removeIf(topic -> {
|
||||||
|
if (!isInferredInternalTopic(topic, description.groupId())) {
|
||||||
|
printError("The internal topic '" + topic + "' is not inferred as internal " +
|
||||||
|
"and thus will not be deleted with the group '" + description.groupId() + "'.", Optional.empty());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
if (!internalTopics.isEmpty()) {
|
||||||
|
groupToInternalTopics.put(description.groupId(), internalTopics);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
if (e.getCause() instanceof UnsupportedVersionException) {
|
||||||
|
printError("Retrieving internal topics is not supported by the broker version. " +
|
||||||
|
"Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause()));
|
||||||
|
} else {
|
||||||
|
printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return groupToInternalTopics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isInferredInternalTopic(final String topicName, final String applicationId) {
|
||||||
|
return topicName.startsWith(applicationId + "-") && matchesInternalTopicFormat(topicName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean matchesInternalTopicFormat(final String topicName) {
|
||||||
|
return topicName.endsWith("-changelog") || topicName.endsWith("-repartition")
|
||||||
|
|| topicName.endsWith("-subscription-registration-topic")
|
||||||
|
|| topicName.endsWith("-subscription-response-topic")
|
||||||
|
|| topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
|
||||||
|
|| topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<StreamsGroupMemberDescription> collectGroupMembers(String groupId) throws Exception {
|
||||||
|
return getDescribeGroup(groupId).members();
|
||||||
|
}
|
||||||
|
|
||||||
|
GroupState collectGroupState(String groupId) throws Exception {
|
||||||
|
return getDescribeGroup(groupId).groupState();
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
|
||||||
|
int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
|
||||||
|
return options.timeoutMs(t);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer> leaderEpoch, Long logEndOffset, Long lag) {
|
public record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer> leaderEpoch, Long logEndOffset, Long lag) {
|
||||||
|
|
|
@ -23,11 +23,15 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import joptsimple.OptionSpec;
|
import joptsimple.OptionSpec;
|
||||||
|
|
||||||
|
import static org.apache.kafka.tools.ToolsUtils.minus;
|
||||||
|
|
||||||
public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
public static final Logger LOGGER = LoggerFactory.getLogger(StreamsGroupCommandOptions.class);
|
public static final Logger LOGGER = LoggerFactory.getLogger(StreamsGroupCommandOptions.class);
|
||||||
|
|
||||||
|
@ -35,6 +39,9 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
public static final String GROUP_DOC = "The streams group we wish to act on.";
|
public static final String GROUP_DOC = "The streams group we wish to act on.";
|
||||||
public static final String LIST_DOC = "List all streams groups.";
|
public static final String LIST_DOC = "List all streams groups.";
|
||||||
public static final String DESCRIBE_DOC = "Describe streams group and list offset lag related to given group.";
|
public static final String DESCRIBE_DOC = "Describe streams group and list offset lag related to given group.";
|
||||||
|
private static final String ALL_GROUPS_DOC = "Apply to all streams groups.";
|
||||||
|
private static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " +
|
||||||
|
"over the entire streams group. For instance --group g1 --group g2";
|
||||||
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
|
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
|
||||||
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes.";
|
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes.";
|
||||||
public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client.";
|
public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client.";
|
||||||
|
@ -52,6 +59,8 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
public final OptionSpec<String> groupOpt;
|
public final OptionSpec<String> groupOpt;
|
||||||
public final OptionSpec<Void> listOpt;
|
public final OptionSpec<Void> listOpt;
|
||||||
public final OptionSpec<Void> describeOpt;
|
public final OptionSpec<Void> describeOpt;
|
||||||
|
final OptionSpec<Void> allGroupsOpt;
|
||||||
|
final OptionSpec<Void> deleteOpt;
|
||||||
public final OptionSpec<Long> timeoutMsOpt;
|
public final OptionSpec<Long> timeoutMsOpt;
|
||||||
public final OptionSpec<String> commandConfigOpt;
|
public final OptionSpec<String> commandConfigOpt;
|
||||||
public final OptionSpec<String> stateOpt;
|
public final OptionSpec<String> stateOpt;
|
||||||
|
@ -59,6 +68,9 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
public final OptionSpec<Void> offsetsOpt;
|
public final OptionSpec<Void> offsetsOpt;
|
||||||
public final OptionSpec<Void> verboseOpt;
|
public final OptionSpec<Void> verboseOpt;
|
||||||
|
|
||||||
|
final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
|
||||||
|
final Set<OptionSpec<?>> allStreamsGroupLevelOpts;
|
||||||
|
|
||||||
public static StreamsGroupCommandOptions fromArgs(String[] args) {
|
public static StreamsGroupCommandOptions fromArgs(String[] args) {
|
||||||
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
|
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
|
||||||
opts.checkArgs();
|
opts.checkArgs();
|
||||||
|
@ -78,6 +90,8 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
.ofType(String.class);
|
.ofType(String.class);
|
||||||
listOpt = parser.accepts("list", LIST_DOC);
|
listOpt = parser.accepts("list", LIST_DOC);
|
||||||
describeOpt = parser.accepts("describe", DESCRIBE_DOC);
|
describeOpt = parser.accepts("describe", DESCRIBE_DOC);
|
||||||
|
allGroupsOpt = parser.accepts("all-groups", ALL_GROUPS_DOC);
|
||||||
|
deleteOpt = parser.accepts("delete", DELETE_DOC);
|
||||||
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
|
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
|
||||||
.availableIf(describeOpt)
|
.availableIf(describeOpt)
|
||||||
.withRequiredArg()
|
.withRequiredArg()
|
||||||
|
@ -99,6 +113,8 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
|
verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
|
||||||
.availableIf(describeOpt);
|
.availableIf(describeOpt);
|
||||||
|
|
||||||
|
allStreamsGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt));
|
||||||
|
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
|
||||||
options = parser.parse(args);
|
options = parser.parse(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +137,13 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used.");
|
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (options.has(deleteOpt)) {
|
||||||
|
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
||||||
|
CommandLineUtils.printUsageAndExit(parser,
|
||||||
|
"Option " + deleteOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
|
}
|
||||||
|
|
||||||
CommandLineUtils.checkInvalidArgs(parser, options, listOpt, membersOpt, offsetsOpt);
|
CommandLineUtils.checkInvalidArgs(parser, options, listOpt, membersOpt, offsetsOpt);
|
||||||
|
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allStreamsGroupLevelOpts, describeOpt, deleteOpt));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,414 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.tools.streams;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.clients.admin.FeatureUpdate;
|
||||||
|
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.GroupState;
|
||||||
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
||||||
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.apache.kafka.common.utils.Exit;
|
||||||
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
||||||
|
import org.apache.kafka.streams.GroupProtocol;
|
||||||
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
|
import org.apache.kafka.streams.KeyValueTimestamp;
|
||||||
|
import org.apache.kafka.streams.StreamsBuilder;
|
||||||
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
|
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||||
|
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||||
|
import org.apache.kafka.streams.kstream.Consumed;
|
||||||
|
import org.apache.kafka.streams.kstream.KStream;
|
||||||
|
import org.apache.kafka.streams.kstream.KTable;
|
||||||
|
import org.apache.kafka.streams.kstream.Materialized;
|
||||||
|
import org.apache.kafka.streams.kstream.Produced;
|
||||||
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
import org.apache.kafka.tools.ToolsTestUtils;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Tag;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import joptsimple.OptionException;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.GroupState.EMPTY;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
@Timeout(600)
|
||||||
|
@Tag("integration")
|
||||||
|
public class DeleteStreamsGroupTest {
|
||||||
|
private static final String INPUT_TOPIC_PREFIX = "input-topic-";
|
||||||
|
private static final String OUTPUT_TOPIC_PREFIX = "output-topic-";
|
||||||
|
private static final String APP_ID_PREFIX = "delete-group-test-";
|
||||||
|
private static final int RECORD_TOTAL = 10;
|
||||||
|
public static EmbeddedKafkaCluster cluster;
|
||||||
|
private static String bootstrapServers;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void startCluster() {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
|
||||||
|
cluster = new EmbeddedKafkaCluster(2, props);
|
||||||
|
cluster.start();
|
||||||
|
|
||||||
|
bootstrapServers = cluster.bootstrapServers();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void closeCluster() {
|
||||||
|
cluster.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteWithUnrecognizedOption() {
|
||||||
|
final String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};
|
||||||
|
assertThrows(OptionException.class, () -> getStreamsGroupService(args));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteWithoutGroupOption() {
|
||||||
|
final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete"};
|
||||||
|
AtomicBoolean exited = new AtomicBoolean(false);
|
||||||
|
Exit.setExitProcedure(((statusCode, message) -> {
|
||||||
|
assertNotEquals(0, statusCode);
|
||||||
|
assertTrue(message.contains("Option [delete] takes one of these options: [all-groups], [group]"));
|
||||||
|
exited.set(true);
|
||||||
|
}));
|
||||||
|
try {
|
||||||
|
getStreamsGroupService(args);
|
||||||
|
} finally {
|
||||||
|
assertTrue(exited.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteSingleGroup() throws Exception {
|
||||||
|
final String appId = generateGroupAppId();
|
||||||
|
String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--group", appId};
|
||||||
|
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args);
|
||||||
|
try (KafkaStreams streams = startKSApp(appId, service)) {
|
||||||
|
/* test 1: delete NON_EMPTY streams group */
|
||||||
|
String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
|
||||||
|
Map<String, Throwable> result = service.deleteGroups();
|
||||||
|
|
||||||
|
assertTrue(output.contains("Group '" + appId + "' could not be deleted due to:")
|
||||||
|
&& output.contains("Streams group '" + appId + "' is not EMPTY."),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")");
|
||||||
|
|
||||||
|
assertNotNull(result.get(appId),
|
||||||
|
"Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
|
||||||
|
|
||||||
|
assertEquals(1, result.size());
|
||||||
|
assertInstanceOf(GroupNotEmptyException.class,
|
||||||
|
result.get(appId),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")");
|
||||||
|
|
||||||
|
/* test 2: delete EMPTY streams group */
|
||||||
|
stopKSApp(appId, streams, service);
|
||||||
|
final Map<String, Throwable> emptyGrpRes = new HashMap<>();
|
||||||
|
output = ToolsTestUtils.grabConsoleOutput(() -> emptyGrpRes.putAll(service.deleteGroups()));
|
||||||
|
|
||||||
|
assertTrue(output.contains("Deletion of requested streams groups ('" + appId + "') was successful."),
|
||||||
|
"The streams group could not be deleted as expected");
|
||||||
|
assertTrue(output.contains("Deletion of associated internal topics of the streams groups ('" + appId + "') was successful."),
|
||||||
|
"The internal topics could not be deleted as expected.");
|
||||||
|
assertEquals(1, emptyGrpRes.size());
|
||||||
|
assertTrue(emptyGrpRes.containsKey(appId));
|
||||||
|
assertNull(emptyGrpRes.get(appId), "The streams group could not be deleted as expected");
|
||||||
|
assertTrue(service.retrieveInternalTopics(List.of(appId)).isEmpty());
|
||||||
|
|
||||||
|
/* test 3: delete an already deleted streams group (non-existing group) */
|
||||||
|
result = service.deleteGroups();
|
||||||
|
assertEquals(1, result.size());
|
||||||
|
assertNotNull(result.get(appId));
|
||||||
|
assertInstanceOf(IllegalArgumentException.class,
|
||||||
|
result.get(appId),
|
||||||
|
"The expected error was not detected while deleting streams group");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteMultipleGroup() throws Exception {
|
||||||
|
final String appId1 = generateGroupAppId();
|
||||||
|
final String appId2 = generateGroupAppId();
|
||||||
|
final String appId3 = generateGroupAppId();
|
||||||
|
|
||||||
|
String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};
|
||||||
|
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args);
|
||||||
|
KafkaStreams streams1 = startKSApp(appId1, service);
|
||||||
|
KafkaStreams streams2 = startKSApp(appId2, service);
|
||||||
|
KafkaStreams streams3 = startKSApp(appId3, service);
|
||||||
|
|
||||||
|
|
||||||
|
/* test 1: delete NON_EMPTY streams groups */
|
||||||
|
final Map<String, Throwable> result = new HashMap<>();
|
||||||
|
String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
|
||||||
|
|
||||||
|
assertTrue(output.contains("Group '" + appId1 + "' could not be deleted due to:")
|
||||||
|
&& output.contains("Streams group '" + appId1 + "' is not EMPTY."),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")");
|
||||||
|
assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:")
|
||||||
|
&& output.contains("Streams group '" + appId3 + "' is not EMPTY."),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")");
|
||||||
|
assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:")
|
||||||
|
&& output.contains("Streams group '" + appId2 + "' is not EMPTY."),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")");
|
||||||
|
|
||||||
|
|
||||||
|
assertNotNull(result.get(appId1),
|
||||||
|
"Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
|
||||||
|
assertNotNull(result.get(appId2),
|
||||||
|
"Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
|
||||||
|
assertNotNull(result.get(appId3),
|
||||||
|
"Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
|
||||||
|
|
||||||
|
assertEquals(3, result.size());
|
||||||
|
assertInstanceOf(GroupNotEmptyException.class,
|
||||||
|
result.get(appId1),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")");
|
||||||
|
assertInstanceOf(GroupNotEmptyException.class,
|
||||||
|
result.get(appId2),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")");
|
||||||
|
assertInstanceOf(GroupNotEmptyException.class,
|
||||||
|
result.get(appId3),
|
||||||
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")");
|
||||||
|
|
||||||
|
/* test 2: delete mix of EMPTY and NON_EMPTY streams group */
|
||||||
|
stopKSApp(appId1, streams1, service);
|
||||||
|
final Map<String, Throwable> mixGrpsRes = new HashMap<>();
|
||||||
|
output = ToolsTestUtils.grabConsoleOutput(() -> mixGrpsRes.putAll(service.deleteGroups()));
|
||||||
|
|
||||||
|
assertTrue(output.contains("Deletion of some streams groups failed:"), "The streams groups deletion did not work as expected");
|
||||||
|
assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:")
|
||||||
|
&& output.contains("Streams group '" + appId2 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")");
|
||||||
|
assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:")
|
||||||
|
&& output.contains("Streams group '" + appId3 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")");
|
||||||
|
assertTrue(output.contains("These streams groups were deleted successfully: '" + appId1 + "'"),
|
||||||
|
"The streams groups deletion did not work as expected");
|
||||||
|
assertTrue(output.contains("Deletion of associated internal topics of the streams groups ('" + appId1 + "') was successful."),
|
||||||
|
"The internal topics could not be deleted as expected");
|
||||||
|
|
||||||
|
assertEquals(3, mixGrpsRes.size());
|
||||||
|
assertNull(mixGrpsRes.get(appId1));
|
||||||
|
assertNotNull(mixGrpsRes.get(appId2));
|
||||||
|
assertNotNull(mixGrpsRes.get(appId3));
|
||||||
|
assertTrue(service.retrieveInternalTopics(List.of(appId1)).isEmpty());
|
||||||
|
assertFalse(service.retrieveInternalTopics(List.of(appId2, appId3)).isEmpty());
|
||||||
|
|
||||||
|
/* test 3: delete all groups */
|
||||||
|
stopKSApp(appId2, streams2, service);
|
||||||
|
stopKSApp(appId3, streams3, service);
|
||||||
|
|
||||||
|
final Map<String, Throwable> allGrpsRes = new HashMap<>();
|
||||||
|
output = ToolsTestUtils.grabConsoleOutput(() -> allGrpsRes.putAll(service.deleteGroups()));
|
||||||
|
|
||||||
|
assertTrue(output.contains("Deletion of requested streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") |
|
||||||
|
output.contains("Deletion of requested streams groups ('" + appId3 + "', '" + appId2 + "') was successful."),
|
||||||
|
"The streams groups deletion did not work as expected");
|
||||||
|
assertTrue(output.contains("Deletion of associated internal topics of the streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") |
|
||||||
|
output.contains("Deletion of associated internal topics of the streams groups ('" + appId3 + "', '" + appId2 + "') was successful."),
|
||||||
|
"The internal topics could not be deleted as expected");
|
||||||
|
|
||||||
|
assertEquals(2, allGrpsRes.size());
|
||||||
|
assertNull(allGrpsRes.get(appId2));
|
||||||
|
assertNull(allGrpsRes.get(appId3));
|
||||||
|
assertTrue(service.retrieveInternalTopics(List.of(appId1, appId2, appId3)).isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteAllGroupsAfterVersionDowngrade() throws Exception {
|
||||||
|
final String appId = generateGroupAppId();
|
||||||
|
String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};
|
||||||
|
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args);
|
||||||
|
try (KafkaStreams streams = startKSApp(appId, service)) {
|
||||||
|
stopKSApp(appId, streams, service);
|
||||||
|
// downgrade the streams.version to 0
|
||||||
|
updateStreamsGroupProtocol((short) 0);
|
||||||
|
final Map<String, Throwable> result = new HashMap<>();
|
||||||
|
String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
|
||||||
|
|
||||||
|
assertTrue(output.contains("Deletion of requested streams groups ('" + appId + "') was successful."),
|
||||||
|
"The streams group could not be deleted as expected");
|
||||||
|
assertTrue(output.contains("Retrieving internal topics is not supported by the broker version. " +
|
||||||
|
"Use 'kafka-topics.sh' to list and delete the group's internal topics."));
|
||||||
|
assertEquals(1, result.size());
|
||||||
|
assertTrue(result.containsKey(appId));
|
||||||
|
assertNull(result.get(appId), "The streams group could not be deleted as expected");
|
||||||
|
assertTrue(service.retrieveInternalTopics(List.of(appId)).isEmpty());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
// upgrade back the streams.version to 1
|
||||||
|
updateStreamsGroupProtocol((short) 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateStreamsGroupProtocol(short version) {
|
||||||
|
try (Admin admin = cluster.createAdminClient()) {
|
||||||
|
Map<String, FeatureUpdate> updates = Utils.mkMap(
|
||||||
|
Utils.mkEntry("streams.version", new FeatureUpdate(version, version == 0 ? FeatureUpdate.UpgradeType.SAFE_DOWNGRADE : FeatureUpdate.UpgradeType.UPGRADE)));
|
||||||
|
admin.updateFeatures(updates, new UpdateFeaturesOptions()).all().get();
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Properties createStreamsConfig(String bootstrapServers, String appId) {
|
||||||
|
Properties streamsConfig = new Properties();
|
||||||
|
|
||||||
|
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
|
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
|
||||||
|
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
|
||||||
|
streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
|
||||||
|
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
|
||||||
|
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
|
||||||
|
return streamsConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
|
||||||
|
StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args);
|
||||||
|
return new StreamsGroupCommand.StreamsGroupService(
|
||||||
|
opts,
|
||||||
|
Map.of(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private KafkaStreams startKSApp(String appId, StreamsGroupCommand.StreamsGroupService service) throws Exception {
|
||||||
|
String inputTopic = generateRandomTopicId(INPUT_TOPIC_PREFIX);
|
||||||
|
String outputTopic = generateRandomTopicId(OUTPUT_TOPIC_PREFIX);
|
||||||
|
StreamsBuilder builder = builder(inputTopic, outputTopic);
|
||||||
|
produceMessages(inputTopic);
|
||||||
|
|
||||||
|
final KStream<String, String> inputStream = builder.stream(inputTopic);
|
||||||
|
|
||||||
|
final AtomicInteger recordCount = new AtomicInteger(0);
|
||||||
|
final KTable<String, String> valueCounts = inputStream
|
||||||
|
.groupByKey()
|
||||||
|
.aggregate(
|
||||||
|
() -> "()",
|
||||||
|
(key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")",
|
||||||
|
Materialized.as("aggregated_value"));
|
||||||
|
|
||||||
|
valueCounts.toStream().peek((key, value) -> {
|
||||||
|
if (recordCount.incrementAndGet() > RECORD_TOTAL) {
|
||||||
|
throw new IllegalStateException("Crash on the " + RECORD_TOTAL + " record");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
KafkaStreams streams = IntegrationTestUtils.getStartedStreams(createStreamsConfig(bootstrapServers, appId), builder, true);
|
||||||
|
|
||||||
|
TestUtils.waitForCondition(
|
||||||
|
() -> !service.collectGroupMembers(appId).isEmpty(),
|
||||||
|
"The group did not initialize as expected."
|
||||||
|
);
|
||||||
|
TestUtils.waitForCondition(
|
||||||
|
() -> checkGroupState(service, appId, GroupState.STABLE),
|
||||||
|
"The group did not become stable as expected."
|
||||||
|
);
|
||||||
|
TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL,
|
||||||
|
"Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get());
|
||||||
|
|
||||||
|
return streams;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopKSApp(String appId, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
|
||||||
|
if (streams != null) {
|
||||||
|
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
||||||
|
closeOptions.timeout(Duration.ofSeconds(30));
|
||||||
|
closeOptions.leaveGroup(true);
|
||||||
|
streams.close(closeOptions);
|
||||||
|
streams.cleanUp();
|
||||||
|
|
||||||
|
TestUtils.waitForCondition(
|
||||||
|
() -> checkGroupState(service, appId, EMPTY),
|
||||||
|
"The group did not become empty as expected."
|
||||||
|
);
|
||||||
|
TestUtils.waitForCondition(
|
||||||
|
() -> service.collectGroupMembers(appId).isEmpty(),
|
||||||
|
"The group size is not zero as expected."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String generateRandomTopicId(String prefix) {
|
||||||
|
return prefix + TestUtils.randomString(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String generateGroupAppId() {
|
||||||
|
return APP_ID_PREFIX + TestUtils.randomString(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkGroupState(StreamsGroupCommand.StreamsGroupService service, String groupId, GroupState state) throws Exception {
|
||||||
|
return Objects.equals(service.collectGroupState(groupId), state);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void produceMessages(final String topic) {
|
||||||
|
List<KeyValueTimestamp<String, String>> data = new ArrayList<>(RECORD_TOTAL);
|
||||||
|
for (long v = 0; v < RECORD_TOTAL; ++v) {
|
||||||
|
data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", cluster.time.milliseconds()));
|
||||||
|
}
|
||||||
|
|
||||||
|
IntegrationTestUtils.produceSynchronously(
|
||||||
|
TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class),
|
||||||
|
false,
|
||||||
|
topic,
|
||||||
|
Optional.empty(),
|
||||||
|
data
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamsBuilder builder(String inputTopic, String outputTopic) {
|
||||||
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
|
||||||
|
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
||||||
|
.groupBy((key, value) -> value)
|
||||||
|
.count()
|
||||||
|
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,9 @@
|
||||||
package org.apache.kafka.tools.streams;
|
package org.apache.kafka.tools.streams;
|
||||||
|
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
|
||||||
|
import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
|
||||||
|
import org.apache.kafka.clients.admin.DeleteTopicsResult;
|
||||||
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
|
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
|
||||||
import org.apache.kafka.clients.admin.GroupListing;
|
import org.apache.kafka.clients.admin.GroupListing;
|
||||||
import org.apache.kafka.clients.admin.KafkaAdminClient;
|
import org.apache.kafka.clients.admin.KafkaAdminClient;
|
||||||
|
@ -40,6 +43,7 @@ import org.apache.kafka.test.TestUtils;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -54,10 +58,15 @@ import joptsimple.OptionException;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class StreamsGroupCommandTest {
|
public class StreamsGroupCommandTest {
|
||||||
|
@ -84,6 +93,7 @@ public class StreamsGroupCommandTest {
|
||||||
foundGroups[0] = new HashSet<>(service.listStreamsGroups());
|
foundGroups[0] = new HashSet<>(service.listStreamsGroups());
|
||||||
return Objects.equals(expectedGroups, foundGroups[0]);
|
return Objects.equals(expectedGroups, foundGroups[0]);
|
||||||
}, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + ".");
|
}, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + ".");
|
||||||
|
|
||||||
service.close();
|
service.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,6 +146,7 @@ public class StreamsGroupCommandTest {
|
||||||
foundListing[0] = new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.STABLE)));
|
foundListing[0] = new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.STABLE)));
|
||||||
return Objects.equals(expectedListingStable, foundListing[0]);
|
return Objects.equals(expectedListingStable, foundListing[0]);
|
||||||
}, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]);
|
}, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]);
|
||||||
|
|
||||||
service.close();
|
service.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,6 +172,7 @@ public class StreamsGroupCommandTest {
|
||||||
when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
|
when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
|
||||||
StreamsGroupCommand.StreamsGroupService service = new StreamsGroupCommand.StreamsGroupService(null, adminClient);
|
StreamsGroupCommand.StreamsGroupService service = new StreamsGroupCommand.StreamsGroupService(null, adminClient);
|
||||||
assertEquals(exp, service.getDescribeGroup(firstGroup));
|
assertEquals(exp, service.getDescribeGroup(firstGroup));
|
||||||
|
|
||||||
service.close();
|
service.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,6 +219,7 @@ public class StreamsGroupCommandTest {
|
||||||
Map<TopicPartition, StreamsGroupCommand.OffsetsInfo> lags = service.getOffsets(x);
|
Map<TopicPartition, StreamsGroupCommand.OffsetsInfo> lags = service.getOffsets(x);
|
||||||
assertEquals(1, lags.size());
|
assertEquals(1, lags.size());
|
||||||
assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L), Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0)));
|
assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L), Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0)));
|
||||||
|
|
||||||
service.close();
|
service.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,6 +232,109 @@ public class StreamsGroupCommandTest {
|
||||||
assertTrue(StreamsGroupCommand.StreamsGroupService.isGroupStateValid(GroupState.UNKNOWN, 1));
|
assertTrue(StreamsGroupCommand.StreamsGroupService.isGroupStateValid(GroupState.UNKNOWN, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetrieveInternalTopics() {
|
||||||
|
Admin adminClient = mock(KafkaAdminClient.class);
|
||||||
|
String groupId = "foo-group";
|
||||||
|
List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--delete"));
|
||||||
|
List<String> sourceTopics = List.of("source-topic1", "source-topic2");
|
||||||
|
List<String> repartitionSinkTopics = List.of("rep-sink-topic1", "rep-sink-topic2");
|
||||||
|
Map<String, StreamsGroupSubtopologyDescription.TopicInfo> stateChangelogTopics = Map.of(
|
||||||
|
groupId + "-1-changelog", mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
|
||||||
|
"some-pre-fix" + "-changelog", mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
|
||||||
|
groupId + "-2-changelog", mock(StreamsGroupSubtopologyDescription.TopicInfo.class));
|
||||||
|
Map<String, StreamsGroupSubtopologyDescription.TopicInfo> repartitionSourceTopics = Map.of(
|
||||||
|
groupId + "-1-repartition", mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
|
||||||
|
groupId + "-some-thing", mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
|
||||||
|
groupId + "-2-repartition", mock(StreamsGroupSubtopologyDescription.TopicInfo.class));
|
||||||
|
|
||||||
|
|
||||||
|
Map<String, StreamsGroupDescription> resultMap = new HashMap<>();
|
||||||
|
resultMap.put(groupId, new StreamsGroupDescription(
|
||||||
|
groupId,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
List.of(new StreamsGroupSubtopologyDescription("subtopology1", sourceTopics, repartitionSinkTopics, stateChangelogTopics, repartitionSourceTopics)),
|
||||||
|
List.of(),
|
||||||
|
GroupState.DEAD,
|
||||||
|
new Node(0, "localhost", 9092),
|
||||||
|
null));
|
||||||
|
DescribeStreamsGroupsResult result = mock(DescribeStreamsGroupsResult.class);
|
||||||
|
when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
|
||||||
|
when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
|
||||||
|
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient);
|
||||||
|
Map<String, List<String>> internalTopics = service.retrieveInternalTopics(List.of(groupId));
|
||||||
|
|
||||||
|
assertNotNull(internalTopics.get(groupId));
|
||||||
|
assertEquals(4, internalTopics.get(groupId).size());
|
||||||
|
assertEquals(new HashSet<>(List.of(groupId + "-1-changelog", groupId + "-2-changelog", groupId + "-1-repartition", groupId + "-2-repartition")),
|
||||||
|
new HashSet<>(internalTopics.get(groupId)));
|
||||||
|
assertFalse(internalTopics.get(groupId).stream().anyMatch(List.of("some-pre-fix-changelog", groupId + "-some-thing")::contains));
|
||||||
|
assertFalse(internalTopics.get(groupId).stream().anyMatch(sourceTopics::contains));
|
||||||
|
assertFalse(internalTopics.get(groupId).stream().anyMatch(repartitionSinkTopics::contains));
|
||||||
|
|
||||||
|
service.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteStreamsGroup() {
|
||||||
|
Admin adminClient = mock(KafkaAdminClient.class);
|
||||||
|
String groupId = "foo-group";
|
||||||
|
List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--delete"));
|
||||||
|
|
||||||
|
DeleteStreamsGroupsResult deleteStreamsGroupsResult = mock(DeleteStreamsGroupsResult.class);
|
||||||
|
when(adminClient.deleteStreamsGroups(eq(List.of(groupId)), any(DeleteStreamsGroupsOptions.class))).thenReturn(deleteStreamsGroupsResult);
|
||||||
|
when(deleteStreamsGroupsResult.deletedGroups()).thenReturn(Map.of(groupId, KafkaFuture.completedFuture(null)));
|
||||||
|
DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class);
|
||||||
|
when(deleteTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
|
||||||
|
when(adminClient.deleteTopics(ArgumentMatchers.anyCollection())).thenReturn(deleteTopicsResult);
|
||||||
|
DescribeStreamsGroupsResult describeStreamsGroupsResult = mock(DescribeStreamsGroupsResult.class);
|
||||||
|
when(describeStreamsGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId, mock(StreamsGroupDescription.class))));
|
||||||
|
when(adminClient.describeStreamsGroups(any())).thenReturn(describeStreamsGroupsResult);
|
||||||
|
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
|
||||||
|
when(adminClient.listGroups(any())).thenReturn(listGroupsResult);
|
||||||
|
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(new GroupListing(groupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY)))));
|
||||||
|
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient);
|
||||||
|
service.deleteGroups();
|
||||||
|
|
||||||
|
verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class));
|
||||||
|
verify(adminClient, times(1)).deleteStreamsGroups(eq(List.of(groupId)), any(DeleteStreamsGroupsOptions.class));
|
||||||
|
verify(adminClient, times(1)).describeStreamsGroups(any());
|
||||||
|
// because of having 0 internal topics, we do not expect deleteTopics to be called
|
||||||
|
verify(adminClient, times(0)).deleteTopics(ArgumentMatchers.anyCollection());
|
||||||
|
|
||||||
|
service.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteNonStreamsGroup() {
|
||||||
|
Admin adminClient = mock(KafkaAdminClient.class);
|
||||||
|
String groupId = "foo-group";
|
||||||
|
List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--delete"));
|
||||||
|
|
||||||
|
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
|
||||||
|
when(adminClient.listGroups(any())).thenReturn(listGroupsResult);
|
||||||
|
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of()));
|
||||||
|
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient);
|
||||||
|
Map<String, Throwable> result = service.deleteGroups();
|
||||||
|
|
||||||
|
assertNotNull(result.get(groupId));
|
||||||
|
assertEquals(result.get(groupId).getMessage(),
|
||||||
|
"Group '" + groupId + "' does not exist or is not a streams group.");
|
||||||
|
assertInstanceOf(IllegalArgumentException.class, result.get(groupId));
|
||||||
|
verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class));
|
||||||
|
// we do not expect any further API to be called
|
||||||
|
verify(adminClient, times(0)).deleteStreamsGroups(eq(List.of(groupId)), any(DeleteStreamsGroupsOptions.class));
|
||||||
|
verify(adminClient, times(0)).describeStreamsGroups(any());
|
||||||
|
verify(adminClient, times(0)).deleteTopics(ArgumentMatchers.anyCollection());
|
||||||
|
|
||||||
|
service.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGroupStatesFromString() {
|
public void testGroupStatesFromString() {
|
||||||
Set<GroupState> result = StreamsGroupCommand.groupStatesFromString("empty");
|
Set<GroupState> result = StreamsGroupCommand.groupStatesFromString("empty");
|
||||||
|
|
Loading…
Reference in New Issue