KAFKA-5862: Remove ZK dependency from Streams reset tool, Part I

Author: Bill Bejeck <bill@confluent.io>
Author: bbejeck <bbejeck@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Ted Yu <yuzhihong@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3927 from bbejeck/KAFKA-5862_remove_zk_dependency_from_streams_reset_tool
This commit is contained in:
Bill Bejeck 2017-09-23 12:05:16 +08:00 committed by Guozhang Wang
parent 1fd70c7c94
commit 271f6b5aec
3 changed files with 102 additions and 79 deletions

View File

@ -53,6 +53,7 @@
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="kafka.admin" />
<allow pkg="kafka.javaapi" />
<allow pkg="kafka.producer" />

View File

@ -17,19 +17,14 @@
package kafka.tools;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.admin.AdminClient;
import kafka.admin.TopicCommand;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
@ -38,8 +33,16 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
/**
* {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch.
@ -68,7 +71,7 @@ public class StreamsResetter {
private static final int EXIT_CODE_ERROR = 1;
private static OptionSpec<String> bootstrapServerOption;
private static OptionSpec<String> zookeeperOption;
private static OptionSpecBuilder zookeeperOption;
private static OptionSpec<String> applicationIdOption;
private static OptionSpec<String> inputTopicsOption;
private static OptionSpec<String> intermediateTopicsOption;
@ -89,52 +92,57 @@ public class StreamsResetter {
int exitCode = EXIT_CODE_SUCCESS;
AdminClient adminClient = null;
ZkUtils zkUtils = null;
KafkaAdminClient kafkaAdminClient = null;
try {
parseArguments(args);
dryRun = options.has(dryRunOption);
adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
final String groupId = options.valueOf(applicationIdOption);
validateNoActiveConsumers(groupId);
zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
30000,
30000,
JaasUtils.isZkSecurityEnabled());
final Properties adminClientProperties = new Properties();
adminClientProperties.put("bootstrap.servers", options.valueOf(bootstrapServerOption));
kafkaAdminClient = (KafkaAdminClient) AdminClient.create(adminClientProperties);
allTopics.clear();
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
"Make sure to stop all running application instances before running the reset tool.");
}
allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS));
if (dryRun) {
System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
}
maybeResetInputAndSeekToEndIntermediateTopicOffsets();
maybeDeleteInternalTopics(zkUtils);
maybeDeleteInternalTopics(kafkaAdminClient);
} catch (final Throwable e) {
exitCode = EXIT_CODE_ERROR;
System.err.println("ERROR: " + e);
e.printStackTrace(System.err);
} finally {
if (adminClient != null) {
adminClient.close();
}
if (zkUtils != null) {
zkUtils.close();
if (kafkaAdminClient != null) {
kafkaAdminClient.close(60, TimeUnit.SECONDS);
}
}
return exitCode;
}
private void validateNoActiveConsumers(final String groupId) {
kafka.admin.AdminClient olderAdminClient = null;
try {
olderAdminClient = kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active. "
+ "Make sure to stop all running application instances before running the reset tool.");
}
} finally {
if (olderAdminClient != null) {
olderAdminClient.close();
}
}
}
private void parseArguments(final String[] args) throws IOException {
final OptionParser optionParser = new OptionParser(false);
@ -148,11 +156,8 @@ public class StreamsResetter {
.ofType(String.class)
.defaultsTo("localhost:9092")
.describedAs("urls");
zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper url with format: HOST:POST")
.withRequiredArg()
.ofType(String.class)
.defaultsTo("localhost:2181")
.describedAs("url");
zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");
inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.")
.withRequiredArg()
.ofType(String.class)
@ -314,30 +319,46 @@ public class StreamsResetter {
return options.valuesOf(intermediateTopicsOption).contains(topic);
}
private void maybeDeleteInternalTopics(final ZkUtils zkUtils) {
private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient) {
System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
for (final String topic : allTopics) {
if (isInternalTopic(topic)) {
try {
if (!dryRun) {
final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
"--zookeeper", options.valueOf(zookeeperOption),
"--delete", "--topic", topic});
TopicCommand.deleteTopic(zkUtils, commandOptions);
} else {
System.out.println("Topic: " + topic);
}
} catch (final RuntimeException e) {
System.err.println("ERROR: Deleting topic " + topic + " failed.");
throw e;
List<String> topicsToDelete = new ArrayList<>();
for (final String listing : allTopics) {
if (isInternalTopic(listing)) {
if (!dryRun) {
topicsToDelete.add(listing);
} else {
System.out.println("Topic: " + listing);
}
}
}
if (!dryRun) {
doDelete(topicsToDelete, adminClient);
}
System.out.println("Done.");
}
private void doDelete(final List<String> topicsToDelete,
final KafkaAdminClient adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("ERROR: deleting topic " + entry.getKey());
e.printStackTrace(System.err);
hasDeleteErrors = true;
}
}
if (hasDeleteErrors) {
throw new RuntimeException("Encountered an error deleting one or more topics");
}
}
private boolean isInternalTopic(final String topicName) {
return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));

View File

@ -16,19 +16,14 @@
*/
package org.apache.kafka.streams.integration;
import kafka.admin.AdminClient;
import kafka.server.KafkaConfig$;
import kafka.tools.StreamsResetter;
import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -56,6 +51,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminClient;
import kafka.server.KafkaConfig$;
import kafka.tools.StreamsResetter;
import kafka.utils.MockTime;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -94,6 +95,7 @@ public class ResetIntegrationTest {
private static int testNo = 0;
private static AdminClient adminClient = null;
private static KafkaAdminClient kafkaAdminClient = null;
private final MockTime mockTime = CLUSTER.time;
private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
@ -104,6 +106,11 @@ public class ResetIntegrationTest {
adminClient.close();
adminClient = null;
}
if (kafkaAdminClient != null) {
kafkaAdminClient.close(10, TimeUnit.SECONDS);
kafkaAdminClient = null;
}
}
@Before
@ -114,6 +121,12 @@ public class ResetIntegrationTest {
adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
}
if (kafkaAdminClient == null) {
Properties props = new Properties();
props.put("bootstrap.servers", CLUSTER.bootstrapServers());
kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(props);
}
// busy wait until cluster (ie, ConsumerGroupCoordinator) is available
while (true) {
Thread.sleep(50);
@ -338,20 +351,20 @@ public class ResetIntegrationTest {
}
private void cleanGlobal(final String intermediateUserTopic) {
// leaving --zookeeper arg here to ensure tool works if users add it
final String[] parameters;
if (intermediateUserTopic != null) {
parameters = new String[]{
"--application-id", APP_ID + testNo,
"--bootstrap-servers", CLUSTER.bootstrapServers(),
"--zookeeper", CLUSTER.zKConnectString(),
"--input-topics", INPUT_TOPIC,
"--intermediate-topics", INTERMEDIATE_USER_TOPIC
"--intermediate-topics", INTERMEDIATE_USER_TOPIC,
"--zookeeper", "localhost:2181"
};
} else {
parameters = new String[]{
"--application-id", APP_ID + testNo,
"--bootstrap-servers", CLUSTER.bootstrapServers(),
"--zookeeper", CLUSTER.zKConnectString(),
"--input-topics", INPUT_TOPIC
};
}
@ -363,7 +376,7 @@ public class ResetIntegrationTest {
Assert.assertEquals(0, exitCode);
}
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {
final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
if (intermediateUserTopic != null) {
@ -374,25 +387,13 @@ public class ResetIntegrationTest {
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
Set<String> allTopics;
ZkUtils zkUtils = null;
try {
zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
30000,
30000,
JaasUtils.isZkSecurityEnabled());
final Set<String> allTopics = new HashSet<>();
do {
Utils.sleep(100);
allTopics = new HashSet<>();
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
} while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size());
} finally {
if (zkUtils != null) {
zkUtils.close();
}
}
final ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000, TimeUnit.MILLISECONDS));
assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
}
private class WaitUntilConsumerGroupGotClosed implements TestCondition {