mirror of https://github.com/apache/kafka.git
KAFKA-18287: Add support for kafka-streams-groups.sh --list (#19422)
Implement the core of kafka-streams-groups.sh for `KIP-1071` - Implement `--list` and its options: (only `--state`) Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
2a370ed721
commit
fa62bce63d
|
@ -0,0 +1,17 @@
|
||||||
|
#!/bin/bash
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.streams.StreamsGroupCommand "$@"
|
|
@ -0,0 +1,168 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.tools.streams;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.GroupListing;
|
||||||
|
import org.apache.kafka.clients.admin.ListGroupsOptions;
|
||||||
|
import org.apache.kafka.clients.admin.ListGroupsResult;
|
||||||
|
import org.apache.kafka.common.GroupState;
|
||||||
|
import org.apache.kafka.common.GroupType;
|
||||||
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
import org.apache.kafka.server.util.CommandLineUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import joptsimple.OptionException;
|
||||||
|
|
||||||
|
public class StreamsGroupCommand {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
|
||||||
|
try {
|
||||||
|
opts.checkArgs();
|
||||||
|
|
||||||
|
// should have exactly one action
|
||||||
|
long numberOfActions = Stream.of(opts.listOpt).filter(opts.options::has).count();
|
||||||
|
if (numberOfActions != 1)
|
||||||
|
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list.");
|
||||||
|
|
||||||
|
run(opts);
|
||||||
|
} catch (OptionException e) {
|
||||||
|
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void run(StreamsGroupCommandOptions opts) {
|
||||||
|
try (StreamsGroupService streamsGroupService = new StreamsGroupService(opts, Map.of())) {
|
||||||
|
if (opts.options.has(opts.listOpt)) {
|
||||||
|
streamsGroupService.listGroups();
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Unknown action!");
|
||||||
|
}
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
|
||||||
|
} catch (Throwable e) {
|
||||||
|
printError("Executing streams group command failed due to " + e.getMessage(), Optional.of(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Set<GroupState> groupStatesFromString(String input) {
|
||||||
|
Set<GroupState> parsedStates =
|
||||||
|
Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet());
|
||||||
|
Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.STREAMS);
|
||||||
|
if (!validStates.containsAll(parsedStates)) {
|
||||||
|
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " +
|
||||||
|
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", ")));
|
||||||
|
}
|
||||||
|
return parsedStates;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void printError(String msg, Optional<Throwable> e) {
|
||||||
|
System.out.println("\nError: " + msg);
|
||||||
|
e.ifPresent(Throwable::printStackTrace);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Visibility for testing
|
||||||
|
static class StreamsGroupService implements AutoCloseable {
|
||||||
|
final StreamsGroupCommandOptions opts;
|
||||||
|
private final Admin adminClient;
|
||||||
|
|
||||||
|
public StreamsGroupService(StreamsGroupCommandOptions opts, Map<String, String> configOverrides) {
|
||||||
|
this.opts = opts;
|
||||||
|
try {
|
||||||
|
this.adminClient = createAdminClient(configOverrides);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public StreamsGroupService(StreamsGroupCommandOptions opts, Admin adminClient) {
|
||||||
|
this.opts = opts;
|
||||||
|
this.adminClient = adminClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void listGroups() throws ExecutionException, InterruptedException {
|
||||||
|
if (opts.options.has(opts.stateOpt)) {
|
||||||
|
String stateValue = opts.options.valueOf(opts.stateOpt);
|
||||||
|
Set<GroupState> states = (stateValue == null || stateValue.isEmpty())
|
||||||
|
? Set.of()
|
||||||
|
: groupStatesFromString(stateValue);
|
||||||
|
List<GroupListing> listings = listStreamsGroupsInStates(states);
|
||||||
|
printGroupInfo(listings);
|
||||||
|
} else
|
||||||
|
listStreamsGroups().forEach(System.out::println);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> listStreamsGroups() {
|
||||||
|
try {
|
||||||
|
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
|
||||||
|
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
|
||||||
|
.withTypes(Set.of(GroupType.STREAMS)));
|
||||||
|
Collection<GroupListing> listings = result.all().get();
|
||||||
|
return listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
|
||||||
|
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
|
||||||
|
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
|
||||||
|
.withTypes(Set.of(GroupType.STREAMS))
|
||||||
|
.inGroupStates(states));
|
||||||
|
return new ArrayList<>(result.all().get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printGroupInfo(List<GroupListing> groups) {
|
||||||
|
// find proper columns width
|
||||||
|
int maxGroupLen = 15;
|
||||||
|
for (GroupListing group : groups) {
|
||||||
|
maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
|
||||||
|
}
|
||||||
|
System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", "STATE");
|
||||||
|
for (GroupListing group : groups) {
|
||||||
|
String groupId = group.groupId();
|
||||||
|
String state = group.groupState().orElse(GroupState.UNKNOWN).toString();
|
||||||
|
System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
adminClient.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Admin createAdminClient(Map<String, String> configOverrides) throws IOException {
|
||||||
|
Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties();
|
||||||
|
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
|
||||||
|
props.putAll(configOverrides);
|
||||||
|
return Admin.create(props);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.tools.streams;
|
||||||
|
|
||||||
|
import org.apache.kafka.server.util.CommandDefaultOptions;
|
||||||
|
import org.apache.kafka.server.util.CommandLineUtils;
|
||||||
|
|
||||||
|
import joptsimple.OptionSpec;
|
||||||
|
|
||||||
|
public class StreamsGroupCommandOptions extends CommandDefaultOptions {
|
||||||
|
public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
|
||||||
|
public static final String LIST_DOC = "List all streams groups.";
|
||||||
|
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
|
||||||
|
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes.";
|
||||||
|
public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client.";
|
||||||
|
public static final String STATE_DOC = "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " +
|
||||||
|
"Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and Dead.";
|
||||||
|
|
||||||
|
public final OptionSpec<String> bootstrapServerOpt;
|
||||||
|
public final OptionSpec<Void> listOpt;
|
||||||
|
public final OptionSpec<Long> timeoutMsOpt;
|
||||||
|
public final OptionSpec<String> commandConfigOpt;
|
||||||
|
public final OptionSpec<String> stateOpt;
|
||||||
|
|
||||||
|
|
||||||
|
public StreamsGroupCommandOptions(String[] args) {
|
||||||
|
super(args);
|
||||||
|
|
||||||
|
bootstrapServerOpt = parser.accepts("bootstrap-server", BOOTSTRAP_SERVER_DOC)
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("server to connect to")
|
||||||
|
.ofType(String.class);
|
||||||
|
listOpt = parser.accepts("list", LIST_DOC);
|
||||||
|
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("timeout (ms)")
|
||||||
|
.ofType(Long.class)
|
||||||
|
.defaultsTo(5000L);
|
||||||
|
commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("command config property file")
|
||||||
|
.ofType(String.class);
|
||||||
|
stateOpt = parser.accepts("state", STATE_DOC)
|
||||||
|
.availableIf(listOpt)
|
||||||
|
.withOptionalArg()
|
||||||
|
.ofType(String.class);
|
||||||
|
|
||||||
|
options = parser.parse(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkArgs() {
|
||||||
|
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list streams groups.");
|
||||||
|
|
||||||
|
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,185 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.tools.streams;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
import org.apache.kafka.clients.admin.GroupListing;
|
||||||
|
import org.apache.kafka.clients.admin.KafkaAdminClient;
|
||||||
|
import org.apache.kafka.clients.admin.ListGroupsOptions;
|
||||||
|
import org.apache.kafka.clients.admin.ListGroupsResult;
|
||||||
|
import org.apache.kafka.clients.admin.MockAdminClient;
|
||||||
|
import org.apache.kafka.common.GroupState;
|
||||||
|
import org.apache.kafka.common.GroupType;
|
||||||
|
import org.apache.kafka.common.KafkaFuture;
|
||||||
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import joptsimple.OptionException;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class StreamsGroupCommandTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListStreamsGroups() throws Exception {
|
||||||
|
String firstGroup = "first-group";
|
||||||
|
String secondGroup = "second-group";
|
||||||
|
String bootstrapServer = "localhost:9092";
|
||||||
|
|
||||||
|
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list"};
|
||||||
|
Admin adminClient = mock(KafkaAdminClient.class);
|
||||||
|
ListGroupsResult result = mock(ListGroupsResult.class);
|
||||||
|
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
|
||||||
|
new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)),
|
||||||
|
new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY))
|
||||||
|
)));
|
||||||
|
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs, adminClient);
|
||||||
|
Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup));
|
||||||
|
|
||||||
|
final Set[] foundGroups = new Set[]{Set.of()};
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
foundGroups[0] = new HashSet<>(service.listStreamsGroups());
|
||||||
|
return Objects.equals(expectedGroups, foundGroups[0]);
|
||||||
|
}, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + ".");
|
||||||
|
service.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListWithUnrecognizedOption() {
|
||||||
|
String bootstrapServer = "localhost:9092";
|
||||||
|
String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", bootstrapServer, "--list"};
|
||||||
|
final Exception exception = assertThrows(OptionException.class, () -> {
|
||||||
|
getStreamsGroupService(cgcArgs, new MockAdminClient());
|
||||||
|
});
|
||||||
|
assertEquals("frivolous-nonsense is not a recognized option", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListStreamsGroupsWithStates() throws Exception {
|
||||||
|
String firstGroup = "first-group";
|
||||||
|
String secondGroup = "second-group";
|
||||||
|
String bootstrapServer = "localhost:9092";
|
||||||
|
|
||||||
|
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"};
|
||||||
|
Admin adminClient = mock(KafkaAdminClient.class);
|
||||||
|
ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
|
||||||
|
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
|
||||||
|
new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)),
|
||||||
|
new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY))
|
||||||
|
)));
|
||||||
|
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
|
||||||
|
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs, adminClient);
|
||||||
|
Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
|
||||||
|
new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)),
|
||||||
|
new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY))));
|
||||||
|
|
||||||
|
final Set[] foundListing = new Set[]{Set.of()};
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
foundListing[0] = new HashSet<>(service.listStreamsGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values()))));
|
||||||
|
return Objects.equals(expectedListing, foundListing[0]);
|
||||||
|
}, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]);
|
||||||
|
|
||||||
|
ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
|
||||||
|
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of(
|
||||||
|
new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))
|
||||||
|
)));
|
||||||
|
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
|
||||||
|
Set<GroupListing> expectedListingStable = Set.of(
|
||||||
|
new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)));
|
||||||
|
|
||||||
|
foundListing[0] = Set.of();
|
||||||
|
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
foundListing[0] = new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.STABLE)));
|
||||||
|
return Objects.equals(expectedListingStable, foundListing[0]);
|
||||||
|
}, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]);
|
||||||
|
service.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupStatesFromString() {
|
||||||
|
Set<GroupState> result = StreamsGroupCommand.groupStatesFromString("empty");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.EMPTY)), result);
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("EMPTY");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.EMPTY)), result);
|
||||||
|
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("notready");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.NOT_READY)), result);
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("notReady");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.NOT_READY)), result);
|
||||||
|
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("assigning");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.ASSIGNING)), result);
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("ASSIGNING");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.ASSIGNING)), result);
|
||||||
|
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("RECONCILING");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.RECONCILING)), result);
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("reconCILING");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.RECONCILING)), result);
|
||||||
|
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("STABLE");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.STABLE)), result);
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("stable");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.STABLE)), result);
|
||||||
|
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("DEAD");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.DEAD)), result);
|
||||||
|
result = StreamsGroupCommand.groupStatesFromString("dead");
|
||||||
|
assertEquals(new HashSet<>(List.of(GroupState.DEAD)), result);
|
||||||
|
|
||||||
|
assertThrow("preparingRebalance");
|
||||||
|
assertThrow("completingRebalance");
|
||||||
|
assertThrow("bad, wrong");
|
||||||
|
assertThrow(" bad, Stable");
|
||||||
|
assertThrow(" , ,");
|
||||||
|
}
|
||||||
|
|
||||||
|
StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args, Admin adminClient) {
|
||||||
|
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
|
||||||
|
return new StreamsGroupCommand.StreamsGroupService(opts, adminClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertThrow(final String wrongState) {
|
||||||
|
final Set<String> validStates = new HashSet<>(Arrays.asList("Assigning", "Dead", "Empty", "Reconciling", "Stable", "NotReady"));
|
||||||
|
|
||||||
|
final Exception exception = assertThrows(IllegalArgumentException.class, () -> StreamsGroupCommand.groupStatesFromString(wrongState));
|
||||||
|
|
||||||
|
assertTrue(exception.getMessage().contains(" Valid states are: "));
|
||||||
|
|
||||||
|
final String[] exceptionMessage = exception.getMessage().split(" Valid states are: ");
|
||||||
|
assertEquals("Invalid state list '" + wrongState + "'.", exceptionMessage[0]);
|
||||||
|
assertEquals(Arrays.stream(exceptionMessage[1].split(","))
|
||||||
|
.map(String::trim)
|
||||||
|
.collect(Collectors.toSet()), validStates);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue