KAFKA-12878: Support --bootstrap-server in kafka-streams-application-reset tool (#12632)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Nikolay 2022-09-19 20:20:41 +03:00 committed by GitHub
parent 3e8e082fab
commit 51b079dca7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 31 additions and 13 deletions

View File

@ -90,7 +90,9 @@ import java.util.stream.Collectors;
public class StreamsResetter {
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_ERROR = 1;
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:9092";
private static OptionSpec<String> bootstrapServersOption;
private static OptionSpec<String> bootstrapServerOption;
private static OptionSpec<String> applicationIdOption;
private static OptionSpec<String> inputTopicsOption;
@ -155,7 +157,15 @@ public class StreamsResetter {
if (options.has(commandConfigOption)) {
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
String bootstrapServerValue = BOOTSTRAP_SERVER_DEFAULT;
if (options.has(bootstrapServerOption))
bootstrapServerValue = options.valueOf(bootstrapServerOption);
else if (options.has(bootstrapServersOption))
bootstrapServerValue = options.valueOf(bootstrapServersOption);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerValue);
adminClient = Admin.create(properties);
maybeDeleteActiveConsumers(groupId, adminClient);
@ -213,11 +223,14 @@ public class StreamsResetter {
.ofType(String.class)
.describedAs("id")
.required();
bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
bootstrapServersOption = optionParser.accepts("bootstrap-servers", "DEPRECATED: Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
.withRequiredArg()
.ofType(String.class)
.defaultsTo("localhost:9092")
.describedAs("urls");
bootstrapServerOption = optionParser.accepts("bootstrap-server", "REQUIRED unless --bootstrap-servers(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. (default: localhost:9092)")
.withRequiredArg()
.ofType(String.class)
.describedAs("server to connect to");
inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset. "
+ "Reset to other offset position by appending other reset offset option, ex: --input-topics foo --shift-by 5")
.withRequiredArg()

View File

@ -84,9 +84,14 @@
--------------------- -----------
* --application-id &lt;String: id&gt; The Kafka Streams application ID
(application.id).
--bootstrap-servers &lt;String: urls&gt; Comma-separated list of broker urls with
format: HOST1:PORT1,HOST2:PORT2
(default: localhost:9092)
--bootstrap-server &lt;String: server to REQUIRED unless --bootstrap-servers
connect to&gt; (deprecated) is specified. The server
(s) to connect to. The broker list
string in the form HOST1:PORT1,HOST2:
PORT2.
--bootstrap-servers &lt;String: urls&gt; DEPRECATED: Comma-separated list of
broker urls with format: HOST1:PORT1,
HOST2:PORT2 (default: localhost:9092)
--by-duration &lt;String: urls&gt; Reset offsets to offset by duration from
current timestamp. Format: &#39;PnDTnHnMnS&#39;
--config-file &lt;String: file name&gt; Property file containing configs to be

View File

@ -394,7 +394,7 @@ public abstract class AbstractResetIntegrationTest {
final String appID) throws Exception {
final List<String> parameterList = new ArrayList<>(
Arrays.asList("--application-id", appID,
"--bootstrap-servers", cluster.bootstrapServers(),
"--bootstrap-server", cluster.bootstrapServers(),
"--input-topics", INPUT_TOPIC
));
if (withIntermediateTopics) {

View File

@ -104,7 +104,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-servers", cluster.bootstrapServers(),
"--bootstrap-server", cluster.bootstrapServers(),
"--input-topics", NON_EXISTING_TOPIC
};
final Properties cleanUpConfig = new Properties();
@ -128,7 +128,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-servers", cluster.bootstrapServers(),
"--bootstrap-server", cluster.bootstrapServers(),
"--input-topics", NON_EXISTING_TOPIC
};
final Properties cleanUpConfig = new Properties();
@ -144,7 +144,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-servers", cluster.bootstrapServers(),
"--bootstrap-server", cluster.bootstrapServers(),
"--intermediate-topics", NON_EXISTING_TOPIC
};
final Properties cleanUpConfig = new Properties();
@ -160,7 +160,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-servers", cluster.bootstrapServers(),
"--bootstrap-server", cluster.bootstrapServers(),
"--internal-topics", NON_EXISTING_TOPIC
};
final Properties cleanUpConfig = new Properties();
@ -176,7 +176,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-servers", cluster.bootstrapServers(),
"--bootstrap-server", cluster.bootstrapServers(),
"--internal-topics", INPUT_TOPIC
};
final Properties cleanUpConfig = new Properties();

View File

@ -544,7 +544,7 @@ class StreamsResetter(StreamsTestBaseService):
cmd = "(export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"%(kafka_run_class)s %(streams_class_name)s " \
"--bootstrap-servers %(bootstrap.servers)s " \
"--bootstrap-server %(bootstrap.servers)s " \
"--force " \
"--application-id %(application.id)s " \
"--input-topics %(input.topics)s " \