mirror of https://github.com/apache/kafka.git
MINOR: simplify ensure topic exists condition (#15458)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
aa0443eb60
commit
c254b22a48
|
@ -208,9 +208,9 @@ public abstract class TopicCommand {
|
||||||
* If set to true, the command will throw an exception if the topic with the
|
* If set to true, the command will throw an exception if the topic with the
|
||||||
* requested name does not exist.
|
* requested name does not exist.
|
||||||
*/
|
*/
|
||||||
private static void ensureTopicExists(List<String> foundTopics, String requestedTopic, Boolean requireTopicExists) {
|
private static void ensureTopicExists(List<String> foundTopics, Optional<String> requestedTopic, Boolean requireTopicExists) {
|
||||||
// If no topic name was mentioned, do not need to throw exception.
|
// If no topic name was mentioned, do not need to throw exception.
|
||||||
if (!(requestedTopic.isEmpty() || !Optional.ofNullable(requestedTopic).isPresent()) && requireTopicExists && foundTopics.isEmpty()) {
|
if (requestedTopic.isPresent() && !requestedTopic.get().isEmpty() && requireTopicExists && foundTopics.isEmpty()) {
|
||||||
// If given topic doesn't exist then throw exception
|
// If given topic doesn't exist then throw exception
|
||||||
throw new IllegalArgumentException(String.format("Topic '%s' does not exist as expected", requestedTopic));
|
throw new IllegalArgumentException(String.format("Topic '%s' does not exist as expected", requestedTopic));
|
||||||
}
|
}
|
||||||
|
@ -490,7 +490,7 @@ public abstract class TopicCommand {
|
||||||
public void alterTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException {
|
public void alterTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException {
|
||||||
CommandTopicPartition topic = new CommandTopicPartition(opts);
|
CommandTopicPartition topic = new CommandTopicPartition(opts);
|
||||||
List<String> topics = getTopics(opts.topic(), opts.excludeInternalTopics());
|
List<String> topics = getTopics(opts.topic(), opts.excludeInternalTopics());
|
||||||
ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists());
|
ensureTopicExists(topics, opts.topic(), !opts.ifExists());
|
||||||
|
|
||||||
if (!topics.isEmpty()) {
|
if (!topics.isEmpty()) {
|
||||||
Map<String, KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>> topicsInfo = adminClient.describeTopics(topics).topicNameValues();
|
Map<String, KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>> topicsInfo = adminClient.describeTopics(topics).topicNameValues();
|
||||||
|
@ -556,7 +556,7 @@ public abstract class TopicCommand {
|
||||||
if (useTopicId) {
|
if (useTopicId) {
|
||||||
ensureTopicIdExists(topicIds, inputTopicId.get(), !opts.ifExists());
|
ensureTopicIdExists(topicIds, inputTopicId.get(), !opts.ifExists());
|
||||||
} else {
|
} else {
|
||||||
ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists());
|
ensureTopicExists(topics, opts.topic(), !opts.ifExists());
|
||||||
}
|
}
|
||||||
List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>();
|
List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -632,7 +632,7 @@ public abstract class TopicCommand {
|
||||||
|
|
||||||
public void deleteTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException {
|
public void deleteTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException {
|
||||||
List<String> topics = getTopics(opts.topic(), opts.excludeInternalTopics());
|
List<String> topics = getTopics(opts.topic(), opts.excludeInternalTopics());
|
||||||
ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists());
|
ensureTopicExists(topics, opts.topic(), !opts.ifExists());
|
||||||
adminClient.deleteTopics(Collections.unmodifiableList(topics),
|
adminClient.deleteTopics(Collections.unmodifiableList(topics),
|
||||||
new DeleteTopicsOptions().retryOnQuotaViolation(false)
|
new DeleteTopicsOptions().retryOnQuotaViolation(false)
|
||||||
).all().get();
|
).all().get();
|
||||||
|
|
Loading…
Reference in New Issue