KAFKA-14589 ConsumerGroupCommand rewritten in java (#14471)

This PR contains changes to rewrite ConsumerGroupCommand in java and transfer it to tools module

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nikolay 2024-03-20 10:34:45 +03:00 committed by GitHub
parent 34d365fd8a
commit b6183a4134
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1869 additions and 1654 deletions

View File

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.group.ConsumerGroupCommand "$@"

View File

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.ConsumerGroupCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.group.ConsumerGroupCommand %*

View File

@ -1980,6 +1980,9 @@ project(':tools') {
implementation project(':log4j-appender')
implementation project(':tools:tools-api')
implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv
implementation libs.jacksonJDK8Datatypes
implementation libs.slf4jApi
implementation libs.log4j
implementation libs.joptSimple

File diff suppressed because it is too large Load Diff

View File

@ -69,7 +69,7 @@ object ToolsUtils {
/**
* This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
* It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`.
* Can be removed once [[kafka.admin.ConsumerGroupCommand]] and [[kafka.tools.ConsoleProducer]] are migrated.
* Can be removed once [[kafka.tools.ConsoleProducer]] are migrated.
*
* @param parser Command line options parser.
* @param message Error message.

View File

@ -16,9 +16,11 @@
*/
package org.apache.kafka.tools;
import joptsimple.OptionParser;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.PrintStream;
import java.util.Arrays;
@ -155,4 +157,17 @@ public class ToolsUtils {
return res;
}
/**
* This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
* It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`.
* Can be removed once [[kafka.tools.ConsoleConsumer]]
* and [[kafka.tools.ConsoleProducer]] are migrated.
*
* @param parser Command line options parser.
* @param message Error message.
*/
public static void printUsageAndExit(OptionParser parser, String message) {
CommandLineUtils.printUsageAndExit(parser, message);
throw new AssertionError("printUsageAndExit should not return, but it did.");
}
}

View File

@ -31,89 +31,99 @@ import static org.apache.kafka.common.utils.Utils.join;
import static org.apache.kafka.tools.ToolsUtils.minus;
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
public static final String GROUP_DOC = "The consumer group we wish to act on.";
public static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
private static final String GROUP_DOC = "The consumer group we wish to act on.";
private static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
"Reset-offsets also supports multiple topic inputs.";
public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process.";
public static final String LIST_DOC = "List all consumer groups.";
public static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group.";
public static final String ALL_GROUPS_DOC = "Apply to all consumer groups.";
public static final String NL = System.lineSeparator();
public static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " +
private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process.";
private static final String LIST_DOC = "List all consumer groups.";
private static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group.";
private static final String ALL_GROUPS_DOC = "Apply to all consumer groups.";
private static final String NL = System.lineSeparator();
private static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " +
"over the entire consumer group. For instance --group g1 --group g2";
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
private static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " +
"or is going through some changes).";
public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer.";
public static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL +
private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL +
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " +
"Additionally, the --export option is used to export the results to a CSV format." + NL +
"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL +
"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'.";
public static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets.";
public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
public static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets.";
public static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset.";
public static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file.";
public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'";
public static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'";
public static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset.";
public static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset.";
public static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset.";
public static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.";
public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only." + NL +
private static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
private static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets.";
private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset.";
private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file.";
private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'";
private static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'";
private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset.";
private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset.";
private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset.";
private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.";
private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only." + NL +
"Example: --bootstrap-server localhost:9092 --describe --group group1 --members";
public static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " +
private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " +
"with '--offsets'/'--members'/'--state' and '--bootstrap-server' options only." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --members --verbose";
public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset lag. " +
private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset lag. " +
"This is the default sub-action of and may be used with '--describe' and '--bootstrap-server' options only." + NL +
"Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets";
public static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL +
private static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL +
"Example: --bootstrap-server localhost:9092 --describe --group group1 --state" + NL +
"When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states." + NL +
"Example: --bootstrap-server localhost:9092 --list --state stable,empty" + NL +
"This option may be used with '--describe', '--list' and '--bootstrap-server' options only.";
public static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics.";
private static final String TYPE_DOC = "When specified with '--list', it displays the types of all the groups. It can also be used to list groups with specific types." + NL +
"Example: --bootstrap-server localhost:9092 --list --type classic,consumer" + NL +
"This option may be used with the '--list' option only.";
private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics.";
public final OptionSpec<String> bootstrapServerOpt;
public final OptionSpec<String> groupOpt;
public final OptionSpec<String> topicOpt;
public final OptionSpec<Void> allTopicsOpt;
public final OptionSpec<Void> listOpt;
public final OptionSpec<Void> describeOpt;
public final OptionSpec<Void> allGroupsOpt;
public final OptionSpec<Void> deleteOpt;
public final OptionSpec<Long> timeoutMsOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<Void> resetOffsetsOpt;
public final OptionSpec<Void> deleteOffsetsOpt;
public final OptionSpec<Void> dryRunOpt;
public final OptionSpec<Void> executeOpt;
public final OptionSpec<Void> exportOpt;
public final OptionSpec<Long> resetToOffsetOpt;
public final OptionSpec<String> resetFromFileOpt;
public final OptionSpec<String> resetToDatetimeOpt;
public final OptionSpec<String> resetByDurationOpt;
public final OptionSpec<Void> resetToEarliestOpt;
public final OptionSpec<Void> resetToLatestOpt;
public final OptionSpec<Void> resetToCurrentOpt;
public final OptionSpec<Long> resetShiftByOpt;
public final OptionSpec<Void> membersOpt;
public final OptionSpec<Void> verboseOpt;
public final OptionSpec<Void> offsetsOpt;
public final OptionSpec<String> stateOpt;
final OptionSpec<String> bootstrapServerOpt;
final OptionSpec<String> groupOpt;
final OptionSpec<String> topicOpt;
final OptionSpec<Void> allTopicsOpt;
final OptionSpec<Void> listOpt;
final OptionSpec<Void> describeOpt;
final OptionSpec<Void> allGroupsOpt;
final OptionSpec<Void> deleteOpt;
final OptionSpec<Long> timeoutMsOpt;
final OptionSpec<String> commandConfigOpt;
final OptionSpec<Void> resetOffsetsOpt;
final OptionSpec<Void> deleteOffsetsOpt;
final OptionSpec<Void> dryRunOpt;
final OptionSpec<Void> executeOpt;
final OptionSpec<Void> exportOpt;
final OptionSpec<Long> resetToOffsetOpt;
final OptionSpec<String> resetFromFileOpt;
final OptionSpec<String> resetToDatetimeOpt;
final OptionSpec<String> resetByDurationOpt;
final OptionSpec<Void> resetToEarliestOpt;
final OptionSpec<Void> resetToLatestOpt;
final OptionSpec<Void> resetToCurrentOpt;
final OptionSpec<Long> resetShiftByOpt;
final OptionSpec<Void> membersOpt;
final OptionSpec<Void> verboseOpt;
final OptionSpec<Void> offsetsOpt;
final OptionSpec<String> stateOpt;
final OptionSpec<String> typeOpt;
public final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
public final Set<OptionSpec<?>> allConsumerGroupLevelOpts;
public final Set<OptionSpec<?>> allResetOffsetScenarioOpts;
public final Set<OptionSpec<?>> allDeleteOffsetsOpts;
final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
final Set<OptionSpec<?>> allConsumerGroupLevelOpts;
final Set<OptionSpec<?>> allResetOffsetScenarioOpts;
final Set<OptionSpec<?>> allDeleteOffsetsOpts;
public ConsumerGroupCommandOptions(String[] args) {
public static ConsumerGroupCommandOptions fromArgs(String[] args) {
ConsumerGroupCommandOptions opts = new ConsumerGroupCommandOptions(args);
opts.checkArgs();
return opts;
}
private ConsumerGroupCommandOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", BOOTSTRAP_SERVER_DOC)
@ -180,6 +190,10 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
.availableIf(describeOpt, listOpt)
.withOptionalArg()
.ofType(String.class);
typeOpt = parser.accepts("type", TYPE_DOC)
.availableIf(listOpt)
.withOptionalArg()
.ofType(String.class);
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
@ -191,7 +205,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
}
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
public void checkArgs() {
void checkArgs() {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
if (options.has(describeOpt)) {

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
public class CsvUtils {
private final static CsvMapper MAPPER = new CsvMapper();
static ObjectReader readerFor(Class<?> clazz) {
return MAPPER.readerFor(clazz).with(getSchema(clazz));
}
static ObjectWriter writerFor(Class<?> clazz) {
return MAPPER.writerFor(clazz).with(getSchema(clazz));
}
private static CsvSchema getSchema(Class<?> clazz) {
String[] fields;
if (CsvRecordWithGroup.class == clazz)
fields = CsvRecordWithGroup.FIELDS;
else if (CsvRecordNoGroup.class == clazz)
fields = CsvRecordNoGroup.FIELDS;
else
throw new IllegalStateException("Unhandled class " + clazz);
return MAPPER.schemaFor(clazz).sortedBy(fields);
}
public static class CsvRecordWithGroup {
public static final String[] FIELDS = new String[] {"group", "topic", "partition", "offset"};
@JsonProperty
private String group;
@JsonProperty
private String topic;
@JsonProperty
private int partition;
@JsonProperty
private long offset;
/**
* Required for jackson.
*/
public CsvRecordWithGroup() {
}
public CsvRecordWithGroup(String group, String topic, int partition, long offset) {
this.group = group;
this.topic = topic;
this.partition = partition;
this.offset = offset;
}
public void setGroup(String group) {
this.group = group;
}
public String getGroup() {
return group;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getPartition() {
return partition;
}
public void setPartition(int partition) {
this.partition = partition;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
}
public static class CsvRecordNoGroup {
public static final String[] FIELDS = new String[]{"topic", "partition", "offset"};
@JsonProperty
private String topic;
@JsonProperty
private int partition;
@JsonProperty
private long offset;
/**
* Required for jackson.
*/
public CsvRecordNoGroup() {
}
public CsvRecordNoGroup(String topic, int partition, long offset) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getPartition() {
return partition;
}
public void setPartition(int partition) {
this.partition = partition;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.common.Node;
class GroupState {
final String group;
final Node coordinator;
final String assignmentStrategy;
final String state;
final int numMembers;
GroupState(String group, Node coordinator, String assignmentStrategy, String state, int numMembers) {
this.group = group;
this.coordinator = coordinator;
this.assignmentStrategy = assignmentStrategy;
this.state = state;
this.numMembers = numMembers;
}
}

View File

@ -21,15 +21,15 @@ import org.apache.kafka.common.TopicPartition;
import java.util.List;
class MemberAssignmentState {
public final String group;
public final String consumerId;
public final String host;
public final String clientId;
public final String groupInstanceId;
public final int numPartitions;
public final List<TopicPartition> assignment;
final String group;
final String consumerId;
final String host;
final String clientId;
final String groupInstanceId;
final int numPartitions;
final List<TopicPartition> assignment;
public MemberAssignmentState(String group, String consumerId, String host, String clientId, String groupInstanceId,
MemberAssignmentState(String group, String consumerId, String host, String clientId, String groupInstanceId,
int numPartitions, List<TopicPartition> assignment) {
this.group = group;
this.consumerId = consumerId;

View File

@ -19,25 +19,23 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.common.Node;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
class PartitionAssignmentState {
public final String group;
public final Optional<Node> coordinator;
public final Optional<String> topic;
public final OptionalInt partition;
public final OptionalLong offset;
public final OptionalLong lag;
public final Optional<String> consumerId;
public final Optional<String> host;
public final Optional<String> clientId;
public final OptionalLong logEndOffset;
final String group;
final Optional<Node> coordinator;
final Optional<String> topic;
final Optional<Integer> partition;
final Optional<Long> offset;
final Optional<Long> lag;
final Optional<String> consumerId;
final Optional<String> host;
final Optional<String> clientId;
final Optional<Long> logEndOffset;
public PartitionAssignmentState(String group, Optional<Node> coordinator, Optional<String> topic,
OptionalInt partition, OptionalLong offset, OptionalLong lag,
PartitionAssignmentState(String group, Optional<Node> coordinator, Optional<String> topic,
Optional<Integer> partition, Optional<Long> offset, Optional<Long> lag,
Optional<String> consumerId, Optional<String> host, Optional<String> clientId,
OptionalLong logEndOffset) {
Optional<Long> logEndOffset) {
this.group = group;
this.coordinator = coordinator;
this.topic = topic;

View File

@ -16,8 +16,8 @@
*/
package org.apache.kafka.tools;
import kafka.utils.TestInfoUtils;
import kafka.server.DynamicConfig;
import kafka.utils.TestInfoUtils;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
@ -210,6 +210,66 @@ public class ToolsTestUtils {
return org.apache.kafka.test.TestUtils.tempFile(sb.toString());
}
/**
* Capture the console output during the execution of the provided function.
*/
public static String grabConsoleOutput(Runnable f) {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
PrintStream out = new PrintStream(buf);
PrintStream out0 = System.out;
System.setOut(out);
try {
f.run();
} finally {
System.setOut(out0);
}
out.flush();
return buf.toString();
}
/**
* Capture the console error during the execution of the provided function.
*/
public static String grabConsoleError(Runnable f) {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
PrintStream err = new PrintStream(buf);
PrintStream err0 = System.err;
System.setErr(err);
try {
f.run();
} finally {
System.setErr(err0);
}
err.flush();
return buf.toString();
}
/**
* Capture both the console output and console error during the execution of the provided function.
*/
public static Tuple2<String, String> grabConsoleOutputAndError(Runnable f) {
ByteArrayOutputStream outBuf = new ByteArrayOutputStream();
ByteArrayOutputStream errBuf = new ByteArrayOutputStream();
PrintStream out = new PrintStream(outBuf);
PrintStream err = new PrintStream(errBuf);
PrintStream out0 = System.out;
PrintStream err0 = System.err;
System.setOut(out);
System.setErr(err);
try {
f.run();
} finally {
System.setOut(out0);
System.setErr(err0);
}
out.flush();
err.flush();
return new Tuple2<>(outBuf.toString(), errBuf.toString());
}
public static class MockExitProcedure implements Exit.Procedure {
private boolean hasExited = false;
private int statusCode;

View File

@ -16,45 +16,30 @@
*/
package org.apache.kafka.tools.consumer.group;
import kafka.admin.ConsumerGroupCommand;
import kafka.api.AbstractAuthorizerIntegrationTest;
import kafka.security.authorizer.AclEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.collection.immutable.Map$;
import scala.collection.JavaConverters;
import java.util.Collections;
import java.util.Properties;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
import static org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.set;
public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
@SuppressWarnings({"deprecation"})
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"zk", "kraft"})
public void testDescribeGroupCliWithGroupDescribe(String quorum) {
addAndVerifyAcls(set(Collections.singleton(new AccessControlEntry(ClientPrincipal().toString(), AclEntry.WildcardHost(), DESCRIBE, ALLOW))), groupResource());
public void testDescribeGroupCliWithGroupDescribe(String quorum) throws Exception {
addAndVerifyAcls(JavaConverters.asScalaSet(Collections.singleton(new AccessControlEntry(ClientPrincipal().toString(), AclEntry.WildcardHost(), DESCRIBE, ALLOW))).toSet(), groupResource());
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group()};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts = new ConsumerGroupCommand.ConsumerGroupCommandOptions(cgcArgs);
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService(opts, Map$.MODULE$.empty());
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(cgcArgs);
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap());
consumerGroupService.describeGroups();
consumerGroupService.close();
}
private void createTopicWithBrokerPrincipal(String topic) {
// Note the principal builder implementation maps all connections on the
// inter-broker listener to the broker principal.
createTopic(
topic,
1,
1,
new Properties(),
interBrokerListenerName(),
new Properties()
);
}
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.tools.consumer.group;
import kafka.api.BaseConsumerTest;
import kafka.admin.ConsumerGroupCommand;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
@ -41,7 +40,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -130,10 +128,10 @@ public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestH
}
ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) {
ConsumerGroupCommand.ConsumerGroupCommandOptions opts = new ConsumerGroupCommand.ConsumerGroupCommandOptions(args);
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args);
ConsumerGroupCommand.ConsumerGroupService service = new ConsumerGroupCommand.ConsumerGroupService(
opts,
asScala(Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)))
Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))
);
consumerGroupService.add(0, service);
@ -352,14 +350,4 @@ public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestH
static <T> Seq<T> seq(Collection<T> seq) {
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
}
@SuppressWarnings("deprecation")
static <K, V> scala.collection.Map<K, V> asScala(Map<K, V> jmap) {
return JavaConverters.mapAsScalaMap(jmap);
}
@SuppressWarnings({"deprecation"})
static <T> scala.collection.immutable.Set<T> set(final Collection<T> set) {
return JavaConverters.asScalaSet(new HashSet<>(set)).toSet();
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools.consumer.group;
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@ -39,15 +38,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.tools.Tuple2;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map$;
import java.util.ArrayList;
import java.util.Arrays;
@ -86,7 +80,7 @@ public class ConsumerGroupServiceTest {
private final Admin admin = mock(Admin.class);
@Test
public void testAdminRequestsForDescribeOffsets() {
public void testAdminRequestsForDescribeOffsets() throws Exception {
String[] args = new String[]{"--bootstrap-server", "localhost:9092", "--group", GROUP, "--describe", "--offsets"};
ConsumerGroupCommand.ConsumerGroupService groupService = consumerGroupService(args);
@ -97,10 +91,10 @@ public class ConsumerGroupServiceTest {
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
assertEquals(Some.apply("Stable"), statesAndAssignments._1);
assertTrue(statesAndAssignments._2.isDefined());
assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments._2.get().size());
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
assertEquals(Optional.of("Stable"), statesAndAssignments.v1);
assertTrue(statesAndAssignments.v2.isPresent());
assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments.v2.get().size());
verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any());
verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()), any());
@ -108,7 +102,7 @@ public class ConsumerGroupServiceTest {
}
@Test
public void testAdminRequestsForDescribeNegativeOffsets() {
public void testAdminRequestsForDescribeNegativeOffsets() throws Exception {
String[] args = new String[]{"--bootstrap-server", "localhost:9092", "--group", GROUP, "--describe", "--offsets"};
ConsumerGroupCommand.ConsumerGroupService groupService = consumerGroupService(args);
@ -170,20 +164,15 @@ public class ConsumerGroupServiceTest {
)).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> unassignedTopicPartitions.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
Option<String> state = statesAndAssignments._1;
Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>> assignments = statesAndAssignments._2;
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
Optional<String> state = statesAndAssignments.v1;
Optional<Collection<PartitionAssignmentState>> assignments = statesAndAssignments.v2;
Map<TopicPartition, Optional<Long>> returnedOffsets = new HashMap<>();
assignments.foreach(results -> {
results.foreach(assignment -> {
returnedOffsets.put(
new TopicPartition(assignment.topic().get(), (Integer) assignment.partition().get()),
assignment.offset().isDefined() ? Optional.of((Long) assignment.offset().get()) : Optional.empty());
return null;
});
return null;
});
Map<TopicPartition, Optional<Long>> returnedOffsets = assignments.map(results ->
results.stream().collect(Collectors.toMap(
assignment -> new TopicPartition(assignment.topic.get(), assignment.partition.get()),
assignment -> assignment.offset))
).orElse(Collections.emptyMap());
Map<TopicPartition, Optional<Long>> expectedOffsets = new HashMap<>();
@ -194,7 +183,7 @@ public class ConsumerGroupServiceTest {
expectedOffsets.put(testTopicPartition4, Optional.of(100L));
expectedOffsets.put(testTopicPartition5, Optional.empty());
assertEquals(Some.apply("Stable"), state);
assertEquals(Optional.of("Stable"), state);
assertEquals(expectedOffsets, returnedOffsets);
verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any());
@ -220,9 +209,9 @@ public class ConsumerGroupServiceTest {
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> resetResult = groupService.resetOffsets();
assertEquals(set(Collections.singletonList(GROUP)), resetResult.keySet());
assertEquals(set(TOPIC_PARTITIONS), resetResult.get(GROUP).get().keys().toSet());
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetResult = groupService.resetOffsets();
assertEquals(Collections.singleton(GROUP), resetResult.keySet());
assertEquals(new HashSet<>(TOPIC_PARTITIONS), resetResult.get(GROUP).keySet());
verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any());
verify(admin, times(1)).describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified), any());
@ -230,9 +219,9 @@ public class ConsumerGroupServiceTest {
}
private ConsumerGroupCommand.ConsumerGroupService consumerGroupService(String[] args) {
return new ConsumerGroupCommand.ConsumerGroupService(new kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions(args), Map$.MODULE$.empty()) {
return new ConsumerGroupCommand.ConsumerGroupService(ConsumerGroupCommandOptions.fromArgs(args), Collections.emptyMap()) {
@Override
public Admin createAdminClient(scala.collection.Map<String, String> configOverrides) {
protected Admin createAdminClient(Map<String, String> configOverrides) {
return admin;
}
};
@ -291,9 +280,4 @@ public class ConsumerGroupServiceTest {
private Map<String, ListConsumerGroupOffsetsSpec> listConsumerGroupOffsetsSpec() {
return Collections.singletonMap(GROUP, new ListConsumerGroupOffsetsSpec());
}
@SuppressWarnings({"deprecation"})
private static <T> scala.collection.immutable.Set<T> set(final Collection<T> set) {
return JavaConverters.asScalaSet(new HashSet<>(set)).toSet();
}
}

View File

@ -17,17 +17,18 @@
package org.apache.kafka.tools.consumer.group;
import joptsimple.OptionException;
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -60,10 +61,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
String output = kafka.utils.TestUtils.grabConsoleOutput(() -> {
service.deleteGroups();
return null;
});
String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
}
@ -78,8 +76,8 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
scala.collection.Map<String, Throwable> result = service.deleteGroups();
assertTrue(result.size() == 1 && result.contains(missingGroup) && result.get(missingGroup).get().getCause() instanceof GroupIdNotFoundException,
Map<String, Throwable> result = service.deleteGroups();
assertTrue(result.size() == 1 && result.containsKey(missingGroup) && result.get(missingGroup).getCause() instanceof GroupIdNotFoundException,
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
}
@ -94,14 +92,11 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.collectGroupMembers(GROUP, false)._2.get().size() == 1,
() -> service.collectGroupMembers(GROUP, false).v2.get().size() == 1,
"The group did not initialize as expected."
);
String output = kafka.utils.TestUtils.grabConsoleOutput(() -> {
service.deleteGroups();
return null;
});
String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
}
@ -117,14 +112,14 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.collectGroupMembers(GROUP, false)._2.get().size() == 1,
() -> service.collectGroupMembers(GROUP, false).v2.get().size() == 1,
"The group did not initialize as expected."
);
scala.collection.Map<String, Throwable> result = service.deleteGroups();
assertNotNull(result.get(GROUP).get(),
Map<String, Throwable> result = service.deleteGroups();
assertNotNull(result.get(GROUP),
"Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
assertTrue(result.size() == 1 && result.contains(GROUP) && result.get(GROUP).get().getCause() instanceof GroupNotEmptyException,
assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP).getCause() instanceof GroupNotEmptyException,
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
}
@ -139,21 +134,18 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state(), "Stable"),
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
"The group did not initialize as expected."
);
executor.shutdown();
TestUtils.waitForCondition(
() -> Objects.equals(service.collectGroupState(GROUP).state(), "Empty"),
() -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
"The group did not become empty as expected."
);
String output = kafka.utils.TestUtils.grabConsoleOutput(() -> {
service.deleteGroups();
return null;
});
String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
assertTrue(output.contains("Deletion of requested consumer groups ('" + GROUP + "') was successful."),
"The consumer group could not be deleted as expected");
}
@ -173,10 +165,10 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() ->
Objects.equals(service.listConsumerGroups().toSet(), set(groups.keySet())) &&
new HashSet<>(service.listConsumerGroups()).equals(groups.keySet()) &&
groups.keySet().stream().allMatch(groupId -> {
try {
return Objects.equals(service.collectGroupState(groupId).state(), "Stable");
return Objects.equals(service.collectGroupState(groupId).state, "Stable");
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -189,17 +181,14 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
TestUtils.waitForCondition(() ->
groups.keySet().stream().allMatch(groupId -> {
try {
return Objects.equals(service.collectGroupState(groupId).state(), "Empty");
return Objects.equals(service.collectGroupState(groupId).state, "Empty");
} catch (Exception e) {
throw new RuntimeException(e);
}
}),
"The group did not become empty as expected.");
String output = kafka.utils.TestUtils.grabConsoleOutput(() -> {
service.deleteGroups();
return null;
}).trim();
String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups).trim();
Set<String> expectedGroupsForDeletion = groups.keySet();
Set<String> deletedGroupsGrepped = Arrays.stream(output.substring(output.indexOf('(') + 1, output.indexOf(')')).split(","))
.map(str -> str.replaceAll("'", "").trim()).collect(Collectors.toSet());
@ -220,17 +209,17 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state(), "Stable"),
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
"The group did not initialize as expected.");
executor.shutdown();
TestUtils.waitForCondition(
() -> Objects.equals(service.collectGroupState(GROUP).state(), "Empty"),
() -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
"The group did not become empty as expected.");
scala.collection.Map<String, Throwable> result = service.deleteGroups();
assertTrue(result.size() == 1 && result.contains(GROUP) && result.get(GROUP).get() == null,
Map<String, Throwable> result = service.deleteGroups();
assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP) == null,
"The consumer group could not be deleted as expected");
}
@ -246,23 +235,20 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state(), "Stable"),
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
"The group did not initialize as expected.");
executor.shutdown();
TestUtils.waitForCondition(
() -> Objects.equals(service.collectGroupState(GROUP).state(), "Empty"),
() -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
"The group did not become empty as expected.");
cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup};
ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
String output = kafka.utils.TestUtils.grabConsoleOutput(() -> {
service2.deleteGroups();
return null;
});
String output = ToolsTestUtils.grabConsoleOutput(service2::deleteGroups);
assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:")
&& output.contains(Errors.GROUP_ID_NOT_FOUND.message())
&& output.contains("These consumer groups were deleted successfully: '" + GROUP + "'"),
@ -281,23 +267,23 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state(), "Stable"),
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
"The group did not initialize as expected.");
executor.shutdown();
TestUtils.waitForCondition(
() -> Objects.equals(service.collectGroupState(GROUP).state(), "Empty"),
() -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
"The group did not become empty as expected.");
cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup};
ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
scala.collection.Map<String, Throwable> result = service2.deleteGroups();
Map<String, Throwable> result = service2.deleteGroups();
assertTrue(result.size() == 2 &&
result.contains(GROUP) && result.get(GROUP).get() == null &&
result.contains(missingGroup) &&
result.get(missingGroup).get().getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()),
result.containsKey(GROUP) && result.get(GROUP) == null &&
result.containsKey(missingGroup) &&
result.get(missingGroup).getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()),
"The consumer group deletion did not work as expected");
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools.consumer.group;
import kafka.admin.ConsumerGroupCommand;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -30,10 +29,12 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.config.Defaults;
import org.apache.kafka.tools.Tuple2;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@ -59,8 +60,8 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGr
String topic = "foo:1";
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic));
scala.Tuple2<Errors, scala.collection.Map<TopicPartition, Throwable>> res = service.deleteOffsets(group, seq(Collections.singleton(topic)).toList());
assertEquals(Errors.GROUP_ID_NOT_FOUND, res._1);
Tuple2<Errors, Map<TopicPartition, Throwable>> res = service.deleteOffsets(group, Collections.singletonList(topic));
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.v1);
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ -144,18 +145,18 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGr
withConsumerGroup.accept(() -> {
String topic = inputPartition >= 0 ? inputTopic + ":" + inputPartition : inputTopic;
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(GROUP, topic));
scala.Tuple2<Errors, scala.collection.Map<TopicPartition, Throwable>> res = service.deleteOffsets(GROUP, seq(Collections.singletonList(topic)).toList());
Errors topLevelError = res._1;
scala.collection.Map<TopicPartition, Throwable> partitions = res._2;
Tuple2<Errors, Map<TopicPartition, Throwable>> res = service.deleteOffsets(GROUP, Collections.singletonList(topic));
Errors topLevelError = res.v1;
Map<TopicPartition, Throwable> partitions = res.v2;
TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
// Partition level error should propagate to top level, unless this is due to a missed partition attempt.
if (inputPartition >= 0) {
assertEquals(expectedError, topLevelError);
}
if (expectedError == Errors.NONE)
assertNull(partitions.get(tp).get());
assertNull(partitions.get(tp));
else
assertEquals(expectedError.exception(), partitions.get(tp).get().getCause());
assertEquals(expectedError.exception(), partitions.get(tp).getCause());
});
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools.consumer.group;
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
@ -24,17 +23,15 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.Tuple2;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.collection.Seq;
import scala.runtime.BoxedUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -43,6 +40,8 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.RANDOM;
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
@ -81,7 +80,7 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
String output = kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service));
String output = ToolsTestUtils.grabConsoleOutput(describeGroups(service));
assertTrue(output.contains("Consumer group '" + missingGroup + "' does not exist."),
"Expected error was not detected for describe option '" + String.join(" ", describeType) + "'");
}
@ -129,7 +128,7 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupProtocol) {
public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupProtocol) throws Exception {
String group = "missing.group";
createOffsetsTopic(listenerName(), new Properties());
@ -139,14 +138,14 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
assertTrue(res._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res._2.map(Seq::isEmpty).getOrElse(() -> false),
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
assertTrue(res.v1.map(s -> s.contains("Dead")).orElse(false) && res.v2.map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + group + "'.");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeMembersOfNonExistingGroup(String quorum, String groupProtocol) {
public void testDescribeMembersOfNonExistingGroup(String quorum, String groupProtocol) throws Exception {
String group = "missing.group";
createOffsetsTopic(listenerName(), new Properties());
@ -156,18 +155,18 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
assertTrue(res._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res._2.map(Seq::isEmpty).getOrElse(() -> false),
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
assertTrue(res.v1.map(s -> s.contains("Dead")).orElse(false) && res.v2.map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + group + "'.");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true);
assertTrue(res2._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res2._2.map(Seq::isEmpty).getOrElse(() -> false),
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true);
assertTrue(res2.v1.map(s -> s.contains("Dead")).orElse(false) && res2.v2.map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option).");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeStateOfNonExistingGroup(String quorum, String groupProtocol) {
public void testDescribeStateOfNonExistingGroup(String quorum, String groupProtocol) throws Exception {
String group = "missing.group";
createOffsetsTopic(listenerName(), new Properties());
@ -177,9 +176,9 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
ConsumerGroupCommand.GroupState state = service.collectGroupState(group);
assertTrue(Objects.equals(state.state(), "Dead") && state.numMembers() == 0 &&
state.coordinator() != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator().id()).isEmpty(),
GroupState state = service.collectGroupState(group);
assertTrue(Objects.equals(state.state, "Dead") && state.numMembers == 0 &&
state.coordinator != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator.id()).isEmpty(),
"Expected the state to be 'Dead', with no members in the group '" + group + "'."
);
}
@ -198,8 +197,8 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
return res._1.trim().split("\n").length == 2 && res._2.isEmpty();
Tuple2<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
return res.v1.trim().split("\n").length == 2 && res.v2.isEmpty();
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
@ -227,9 +226,9 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
long numLines = Arrays.stream(res._1.trim().split("\n")).filter(line -> !line.isEmpty()).count();
return (numLines == expectedNumLines) && res._2.isEmpty();
Tuple2<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
long numLines = Arrays.stream(res.v1.trim().split("\n")).filter(line -> !line.isEmpty()).count();
return (numLines == expectedNumLines) && res.v2.isEmpty();
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
@ -253,9 +252,9 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
long numLines = Arrays.stream(res._1.trim().split("\n")).filter(s -> !s.isEmpty()).count();
return (numLines == expectedNumLines) && res._2.isEmpty();
Tuple2<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
long numLines = Arrays.stream(res.v1.trim().split("\n")).filter(s -> !s.isEmpty()).count();
return (numLines == expectedNumLines) && res.v2.isEmpty();
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
@ -272,26 +271,28 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
Option<String> state = groupOffsets._1;
Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>> assignments = groupOffsets._2;
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
Optional<String> state = groupOffsets.v1;
Optional<Collection<PartitionAssignmentState>> assignments = groupOffsets.v2;
Function1<ConsumerGroupCommand.PartitionAssignmentState, Object> isGrp = s -> Objects.equals(s.group(), GROUP);
Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
boolean res = state.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
assignments.isDefined() &&
assignments.get().count(isGrp) == 1;
boolean res = state.map(s -> s.contains("Stable")).orElse(false) &&
assignments.isPresent() &&
assignments.get().stream().filter(isGrp).count() == 1;
if (!res)
return false;
@SuppressWarnings("cast")
ConsumerGroupCommand.PartitionAssignmentState partitionState =
(ConsumerGroupCommand.PartitionAssignmentState) assignments.get().filter(isGrp).head();
Optional<PartitionAssignmentState> maybePartitionState = assignments.get().stream().filter(isGrp).findFirst();
if (!maybePartitionState.isPresent())
return false;
return !partitionState.consumerId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
!partitionState.clientId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
!partitionState.host().map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false);
PartitionAssignmentState partitionState = maybePartitionState.get();
return !partitionState.consumerId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
!partitionState.clientId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
!partitionState.host.map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
}, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group " + GROUP + ".");
}
@ -306,32 +307,34 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> groupMembers = service.collectGroupMembers(GROUP, false);
Option<String> state = groupMembers._1;
Option<Seq<ConsumerGroupCommand.MemberAssignmentState>> assignments = groupMembers._2;
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> groupMembers = service.collectGroupMembers(GROUP, false);
Optional<String> state = groupMembers.v1;
Optional<Collection<MemberAssignmentState>> assignments = groupMembers.v2;
Function1<ConsumerGroupCommand.MemberAssignmentState, Object> isGrp = s -> Objects.equals(s.group(), GROUP);
Predicate<MemberAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
boolean res = state.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
assignments.isDefined() &&
assignments.get().count(s -> Objects.equals(s.group(), GROUP)) == 1;
boolean res = state.map(s -> s.contains("Stable")).orElse(false) &&
assignments.isPresent() &&
assignments.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1;
if (!res)
return false;
@SuppressWarnings("cast")
ConsumerGroupCommand.MemberAssignmentState assignmentState =
(ConsumerGroupCommand.MemberAssignmentState) assignments.get().filter(isGrp).head();
Optional<MemberAssignmentState> maybeAssignmentState = assignments.get().stream().filter(isGrp).findFirst();
if (!maybeAssignmentState.isPresent())
return false;
return !Objects.equals(assignmentState.consumerId(), ConsumerGroupCommand.MISSING_COLUMN_VALUE()) &&
!Objects.equals(assignmentState.clientId(), ConsumerGroupCommand.MISSING_COLUMN_VALUE()) &&
!Objects.equals(assignmentState.host(), ConsumerGroupCommand.MISSING_COLUMN_VALUE());
MemberAssignmentState assignmentState = maybeAssignmentState.get();
return !Objects.equals(assignmentState.consumerId, ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
!Objects.equals(assignmentState.clientId, ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
!Objects.equals(assignmentState.host, ConsumerGroupCommand.MISSING_COLUMN_VALUE);
}, "Expected a 'Stable' group status, rows and valid member information for group " + GROUP + ".");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
if (res._2.isDefined()) {
assertTrue(res._2.get().size() == 1 && res._2.get().iterator().next().assignment().size() == 1,
if (res.v2.isPresent()) {
assertTrue(res.v2.get().size() == 1 && res.v2.get().iterator().next().assignment.size() == 1,
"Expected a topic partition assigned to the single group member for group " + GROUP);
} else {
fail("Expected partition assignments for members of group " + GROUP);
@ -354,12 +357,12 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") &&
state.numMembers() == 1 &&
Objects.equals(state.assignmentStrategy(), "range") &&
state.coordinator() != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0;
GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state, "Stable") &&
state.numMembers == 1 &&
Objects.equals(state.assignmentStrategy, "range") &&
state.coordinator != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0;
}, "Expected a 'Stable' group status, with one member and round robin assignment strategy for group " + GROUP + ".");
}
@ -381,12 +384,12 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") &&
state.numMembers() == 1 &&
Objects.equals(state.assignmentStrategy(), expectedName) &&
state.coordinator() != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0;
GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state, "Stable") &&
state.numMembers == 1 &&
Objects.equals(state.assignmentStrategy, expectedName) &&
state.coordinator != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0;
}, "Expected a 'Stable' group status, with one member and " + expectedName + " assignment strategy for group " + GROUP + ".");
}
@ -404,14 +407,14 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
return res._1.trim().split("\n").length == 2 && res._2.isEmpty();
Tuple2<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
return res.v1.trim().split("\n").length == 2 && res.v2.isEmpty();
}, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(
() -> kafka.utils.TestUtils.grabConsoleError(describeGroups(service)).contains("Consumer group '" + group + "' has no active members."),
() -> ToolsTestUtils.grabConsoleError(describeGroups(service)).contains("Consumer group '" + group + "' has no active members."),
"Expected no active member in describe group results with describe type " + String.join(" ", describeType));
}
}
@ -428,26 +431,25 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false)
&& res._2.map(c -> c.exists(assignment -> Objects.equals(assignment.group(), GROUP) && assignment.offset().isDefined())).getOrElse(() -> false);
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res.v1.map(s -> s.contains("Stable")).orElse(false)
&& res.v2.map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, GROUP) && assignment.offset.isPresent())).orElse(false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> offsets = service.collectGroupOffsets(GROUP);
Option<String> state = offsets._1;
Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>> assignments = offsets._2;
@SuppressWarnings("unchecked")
Seq<ConsumerGroupCommand.PartitionAssignmentState> testGroupAssignments = assignments.get().filter(a -> Objects.equals(a.group(), GROUP)).toSeq();
ConsumerGroupCommand.PartitionAssignmentState assignment = testGroupAssignments.head();
return state.map(s -> s.contains("Empty")).getOrElse(() -> false) &&
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(GROUP);
Optional<String> state = offsets.v1;
Optional<Collection<PartitionAssignmentState>> assignments = offsets.v2;
List<PartitionAssignmentState> testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, GROUP)).collect(Collectors.toList());
PartitionAssignmentState assignment = testGroupAssignments.get(0);
return state.map(s -> s.contains("Empty")).orElse(false) &&
testGroupAssignments.size() == 1 &&
assignment.consumerId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && // the member should be gone
assignment.clientId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
assignment.host().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false);
assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone
assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
assignment.host.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
}, "failed to collect group offsets");
}
@ -463,17 +465,17 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false)
&& res._2.map(c -> c.exists(m -> Objects.equals(m.group(), GROUP))).getOrElse(() -> false);
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res.v1.map(s -> s.contains("Stable")).orElse(false)
&& res.v2.map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, GROUP))).orElse(false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Empty")).getOrElse(() -> false) && res._2.isDefined() && res._2.get().isEmpty();
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res.v1.map(s -> s.contains("Empty")).orElse(false) && res.v2.isPresent() && res.v2.get().isEmpty();
}, "Expected no member in describe group members results for group '" + GROUP + "'");
}
@ -489,19 +491,19 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") &&
state.numMembers() == 1 &&
state.coordinator() != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0;
GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state, "Stable") &&
state.numMembers == 1 &&
state.coordinator != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0;
}, "Expected the group '" + GROUP + "' to initially become stable, and have a single member.");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Empty") && state.numMembers() == 0;
GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state, "Empty") && state.numMembers == 0;
}, "Expected the group '" + GROUP + "' to become empty after the only member leaving.");
}
@ -519,9 +521,9 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
Tuple2<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2;
return res._2.isEmpty() && res._1.trim().split("\n").length == expectedNumRows;
return res.v2.isEmpty() && res.v1.trim().split("\n").length == expectedNumRows;
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
}
}
@ -538,11 +540,11 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 1 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.partition().isDefined()) == 1;
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res.v1.map(s -> s.contains("Stable")).isPresent() &&
res.v2.isPresent() &&
res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1 &&
res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 1;
}, "Expected rows for consumers with no assigned partitions in describe group results");
}
@ -558,18 +560,18 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 1) == 1 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 0) == 1 &&
res._2.get().forall(s -> s.assignment().isEmpty());
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res.v1.map(s -> s.contains("Stable")).orElse(false) &&
res.v2.isPresent() &&
res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 1 &&
res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0).count() == 1 &&
res.v2.get().stream().allMatch(s -> s.assignment.isEmpty());
}, "Expected rows for consumers with no assigned partitions in describe group results");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
assertTrue(res._1.map(s -> s.contains("Stable")).getOrElse(() -> false)
&& res._2.map(c -> c.exists(s -> !s.assignment().isEmpty())).getOrElse(() -> false),
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
assertTrue(res.v1.map(s -> s.contains("Stable")).orElse(false)
&& res.v2.map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false),
"Expected additional columns in verbose version of describe members");
}
@ -585,8 +587,8 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") && state.numMembers() == 2;
GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state, "Stable") && state.numMembers == 2;
}, "Expected two consumers in describe group results");
}
@ -606,9 +608,9 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
Tuple2<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3;
return res._2.isEmpty() && res._1.trim().split("\n").length == expectedNumRows;
return res.v2.isEmpty() && res.v1.trim().split("\n").length == expectedNumRows;
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
}
}
@ -627,12 +629,12 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.partition().isDefined()) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && !x.partition().isDefined()) == 0;
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res.v1.map(s -> s.contains("Stable")).orElse(false) &&
res.v2.isPresent() &&
res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 2 &&
res.v2.get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && !x.partition.isPresent());
}, "Expected two rows (one row per consumer) in describe group results.");
}
@ -650,16 +652,16 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 1) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 0) == 0;
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res.v1.map(s -> s.contains("Stable")).orElse(false) &&
res.v2.isPresent() &&
res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 2 &&
res.v2.get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0);
}, "Expected two rows (one row per consumer) in describe group members results.");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
assertTrue(res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && res._2.map(s -> s.count(x -> x.assignment().isEmpty())).getOrElse(() -> 0) == 0,
Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
assertTrue(res.v1.map(s -> s.contains("Stable")).orElse(false) && res.v2.map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
"Expected additional columns in verbose version of describe members");
}
@ -677,8 +679,8 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") && Objects.equals(state.group(), GROUP) && state.numMembers() == 2;
GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state, "Stable") && Objects.equals(state.group, GROUP) && state.numMembers == 2;
}, "Expected a stable group with two members in describe group state result.");
}
@ -696,9 +698,9 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Empty")).getOrElse(() -> false)
&& res._2.isDefined() && res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2;
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res.v1.map(s -> s.contains("Empty")).orElse(false)
&& res.v2.isPresent() && res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2;
}, "Expected a stable group with two members in describe group state result.");
}
@ -796,32 +798,33 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
Function1<ConsumerGroupCommand.PartitionAssignmentState, Object> isGrp = s -> Objects.equals(s.group(), GROUP);
Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
boolean res = groupOffsets._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
groupOffsets._2.isDefined() &&
groupOffsets._2.get().count(isGrp) == 1;
boolean res = groupOffsets.v1.map(s -> s.contains("Stable")).orElse(false) &&
groupOffsets.v2.isPresent() &&
groupOffsets.v2.get().stream().filter(isGrp).count() == 1;
if (!res)
return false;
@SuppressWarnings("cast")
ConsumerGroupCommand.PartitionAssignmentState assignmentState =
(ConsumerGroupCommand.PartitionAssignmentState) groupOffsets._2.get().filter(isGrp).head();
Optional<PartitionAssignmentState> maybeAssignmentState = groupOffsets.v2.get().stream().filter(isGrp).findFirst();
if (!maybeAssignmentState.isPresent())
return false;
return assignmentState.consumerId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
assignmentState.clientId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
assignmentState.host().map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false);
PartitionAssignmentState assignmentState = maybeAssignmentState.get();
return assignmentState.consumerId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
assignmentState.clientId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
assignmentState.host.map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
}, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group " + GROUP + ".");
}
private Function0<BoxedUnit> describeGroups(ConsumerGroupCommand.ConsumerGroupService service) {
private Runnable describeGroups(ConsumerGroupCommand.ConsumerGroupService service) {
return () -> {
try {
service.describeGroups();
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -17,16 +17,17 @@
package org.apache.kafka.tools.consumer.group;
import joptsimple.OptionException;
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.apache.kafka.tools.ToolsTestUtils;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@ -58,11 +59,11 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new AtomicReference<>();
Set<String> expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP));
final AtomicReference<Set> foundGroups = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
foundGroups.set(set(service.listConsumerGroups()));
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + ".");
}
@ -272,7 +273,7 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
@Test
public void testConsumerGroupStatesFromString() {
scala.collection.Set<ConsumerGroupState> result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable");
Set<ConsumerGroupState> result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable");
assertEquals(set(Collections.singleton(ConsumerGroupState.STABLE)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable, PreparingRebalance");
@ -299,7 +300,7 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
@Test
public void testConsumerGroupTypesFromString() {
scala.collection.Set<GroupType> result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer");
Set<GroupType> result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer");
assertEquals(set(Collections.singleton(GroupType.CONSUMER)), result);
result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, classic");
@ -475,9 +476,9 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
Set<ConsumerGroupState> stateFilterSet,
Set<ConsumerGroupListing> expectedListing
) throws Exception {
final AtomicReference<scala.collection.Set> foundListing = new AtomicReference<>();
final AtomicReference<Set<ConsumerGroupListing>> foundListing = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundListing.set(service.listConsumerGroupsWithFilters(set(typeFilterSet), set(stateFilterSet)).toSet());
foundListing.set(set(service.listConsumerGroupsWithFilters(set(typeFilterSet), set(stateFilterSet))));
return Objects.equals(set(expectedListing), foundListing.get());
}, () -> "Expected to show groups " + expectedListing + ", but found " + foundListing.get() + ".");
}
@ -498,7 +499,7 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
) throws InterruptedException {
final AtomicReference<String> out = new AtomicReference<>("");
TestUtils.waitForCondition(() -> {
String output = runAndGrabConsoleOutput(args);
String output = ToolsTestUtils.grabConsoleOutput(() -> ConsumerGroupCommand.main(args.toArray(new String[0])));
out.set(output);
int index = 0;
@ -522,12 +523,7 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
}, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
}
private static String runAndGrabConsoleOutput(
List<String> args
) {
return kafka.utils.TestUtils.grabConsoleOutput(() -> {
ConsumerGroupCommand.main(args.toArray(new String[0]));
return null;
});
public static <T> Set<T> set(Collection<T> set) {
return new HashSet<>(set);
}
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.tools.consumer.group;
import joptsimple.OptionException;
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -25,7 +24,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import scala.Option;
import java.io.BufferedWriter;
import java.io.File;
@ -98,10 +96,10 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args);
// Make sure we got a coordinator
TestUtils.waitForCondition(
() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator().host(), "localhost"),
() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"),
"Can't find a coordinator");
Option<scala.collection.Map<TopicPartition, OffsetAndMetadata>> resetOffsets = consumerGroupCommand.resetOffsets().get(group);
assertTrue(resetOffsets.isDefined() && resetOffsets.get().isEmpty());
Map<TopicPartition, OffsetAndMetadata> resetOffsets = consumerGroupCommand.resetOffsets().get(group);
assertTrue(resetOffsets.isEmpty());
assertTrue(committedOffsets(TOPIC, group).isEmpty());
}
@ -211,7 +209,7 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
}
@Test
public void testResetOffsetsByDurationFallbackToLatestWhenNoRecords() throws Exception {
public void testResetOffsetsByDurationFallbackToLatestWhenNoRecords() {
String topic = "foo2";
String[] args = buildArgsForGroup(GROUP, "--topic", topic, "--by-duration", "PT1M", "--execute");
createTopic(topic, 1, 1, new Properties(), listenerName(), new Properties());
@ -326,7 +324,7 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
TopicPartition tp1 = new TopicPartition(topic1, 0);
TopicPartition tp2 = new TopicPartition(topic2, 0);
Map<TopicPartition, Long> allResetOffsets = toOffsetMap(consumerGroupCommand.resetOffsets().get(GROUP));
Map<TopicPartition, Long> allResetOffsets = toOffsetMap(resetOffsets(consumerGroupCommand).get(GROUP));
Map<TopicPartition, Long> expMap = new HashMap<>();
expMap.put(tp1, 0L);
expMap.put(tp2, 0L);
@ -357,7 +355,7 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
TopicPartition tp1 = new TopicPartition(topic1, 1);
TopicPartition tp2 = new TopicPartition(topic2, 1);
Map<TopicPartition, Long> allResetOffsets = toOffsetMap(consumerGroupCommand.resetOffsets().get(GROUP));
Map<TopicPartition, Long> allResetOffsets = toOffsetMap(resetOffsets(consumerGroupCommand).get(GROUP));
Map<TopicPartition, Long> expMap = new HashMap<>();
expMap.put(tp1, 0L);
expMap.put(tp2, 0L);
@ -386,7 +384,7 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
File file = TestUtils.tempFile("reset", ".csv");
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> exportedOffsets = consumerGroupCommand.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> exportedOffsets = consumerGroupCommand.resetOffsets();
BufferedWriter bw = new BufferedWriter(new FileWriter(file));
bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets));
bw.close();
@ -398,7 +396,7 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
String[] cgcArgsExec = buildArgsForGroup(GROUP, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec);
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> importedOffsets = consumerGroupCommandExec.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets = consumerGroupCommandExec.resetOffsets();
assertEquals(exp1, toOffsetMap(importedOffsets.get(GROUP)));
adminZkClient().deleteTopic(topic);
@ -430,7 +428,7 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
File file = TestUtils.tempFile("reset", ".csv");
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> exportedOffsets = consumerGroupCommand.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> exportedOffsets = consumerGroupCommand.resetOffsets();
BufferedWriter bw = new BufferedWriter(new FileWriter(file));
bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets));
bw.close();
@ -447,14 +445,14 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
// Multiple --group's offset import
String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec);
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> importedOffsets = consumerGroupCommandExec.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets = consumerGroupCommandExec.resetOffsets();
assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
// Single --group offset import using "group,topic,partition,offset" csv format
String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2);
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> importedOffsets2 = consumerGroupCommandExec2.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets2 = consumerGroupCommandExec2.resetOffsets();
assertEquals(exp1, toOffsetMap(importedOffsets2.get(group1)));
adminZkClient().deleteTopic(TOPIC);
@ -503,9 +501,9 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService consumerGroupService, String group) throws Exception {
TestUtils.waitForCondition(() -> {
String state = consumerGroupService.collectGroupState(group).state();
String state = consumerGroupService.collectGroupState(group).state;
return Objects.equals(state, "Empty") || Objects.equals(state, "Dead");
}, "Expected that consumer group is inactive. Actual state: " + consumerGroupService.collectGroupState(group).state());
}, "Expected that consumer group is inactive. Actual state: " + consumerGroupService.collectGroupState(group).state);
}
private void resetAndAssertOffsets(String[] args,
@ -521,25 +519,17 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
Map<String, Map<TopicPartition, Long>> expectedOffsets = topics.stream().collect(Collectors.toMap(
Function.identity(),
topic -> Collections.singletonMap(new TopicPartition(topic, 0), expectedOffset)));
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> resetOffsetsResultByGroup = consumerGroupCommand.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsetsResultByGroup = resetOffsets(consumerGroupCommand);
try {
for (final String topic : topics) {
resetOffsetsResultByGroup.foreach(entry -> {
String group = entry._1;
scala.collection.Map<TopicPartition, OffsetAndMetadata> partitionInfo = entry._2;
resetOffsetsResultByGroup.forEach((group, partitionInfo) -> {
Map<TopicPartition, Long> priorOffsets = committedOffsets(topic, group);
Map<TopicPartition, Long> offsets = new HashMap<>();
partitionInfo.foreach(partitionInfoEntry -> {
TopicPartition tp = partitionInfoEntry._1;
OffsetAndMetadata offsetAndMetadata = partitionInfoEntry._2;
if (Objects.equals(tp.topic(), topic))
offsets.put(tp, offsetAndMetadata.offset());
return null;
});
assertEquals(expectedOffsets.get(topic), offsets);
assertEquals(expectedOffsets.get(topic),
partitionInfo.entrySet().stream()
.filter(entry -> Objects.equals(entry.getKey().topic(), topic))
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())));
assertEquals(dryRun ? priorOffsets : expectedOffsets.get(topic), committedOffsets(topic, group));
return null;
});
}
} finally {
@ -550,35 +540,22 @@ public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
private void resetAndAssertOffsetsCommitted(ConsumerGroupCommand.ConsumerGroupService consumerGroupService,
Map<TopicPartition, Long> expectedOffsets,
String topic) {
scala.collection.Map<String, scala.collection.Map<TopicPartition, OffsetAndMetadata>> allResetOffsets = consumerGroupService.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> allResetOffsets = resetOffsets(consumerGroupService);
allResetOffsets.foreach(entry -> {
String group = entry._1;
scala.collection.Map<TopicPartition, OffsetAndMetadata> offsetsInfo = entry._2;
offsetsInfo.foreach(offsetInfoEntry -> {
TopicPartition tp = offsetInfoEntry._1;
OffsetAndMetadata offsetMetadata = offsetInfoEntry._2;
allResetOffsets.forEach((group, offsetsInfo) -> {
offsetsInfo.forEach((tp, offsetMetadata) -> {
assertEquals(offsetMetadata.offset(), expectedOffsets.get(tp));
assertEquals(expectedOffsets, committedOffsets(topic, group));
return null;
});
return null;
});
}
Map<TopicPartition, Long> toOffsetMap(Option<scala.collection.Map<TopicPartition, OffsetAndMetadata>> map) {
assertTrue(map.isDefined());
Map<TopicPartition, Long> res = new HashMap<>();
map.foreach(m -> {
m.foreach(entry -> {
TopicPartition tp = entry._1;
OffsetAndMetadata offsetAndMetadata = entry._2;
res.put(tp, offsetAndMetadata.offset());
return null;
});
return null;
});
return res;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets(ConsumerGroupCommand.ConsumerGroupService consumerGroupService) {
return consumerGroupService.resetOffsets();
}
Map<TopicPartition, Long> toOffsetMap(Map<TopicPartition, OffsetAndMetadata> map) {
return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
}
private String[] addTo(String[] args, String...extra) {

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools.consumer.group;
import kafka.admin.ConsumerGroupCommand;
import kafka.api.AbstractSaslTest;
import kafka.api.Both$;
import kafka.utils.JaasTestUtils;
@ -36,7 +35,6 @@ import scala.Option;
import scala.Some$;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map$;
import java.io.File;
import java.io.IOException;
@ -164,8 +162,8 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
"--describe",
"--group", "test.group",
"--command-config", propsFile.getAbsolutePath()};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts = new ConsumerGroupCommand.ConsumerGroupCommandOptions(cgcArgs);
return new ConsumerGroupCommand.ConsumerGroupService(opts, Map$.MODULE$.empty());
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(cgcArgs);
return new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap());
}
private void verifyAuthenticationException(Executable action) {