KAFKA-10357: Add validation method for internal topics (#10266)

For KIP-698, we need a way to validate internal topics before we create them. This PR adds a validation method to the InternalTopicManager for that purpose.

Reviewers: Rohan Desai <rohan@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bruno Cadonna 2021-03-11 18:55:30 +01:00 committed by GitHub
parent c534bf45ce
commit 800d9b5abc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1000 additions and 63 deletions

View File

@ -36,7 +36,7 @@ public class DescribeConfigsResult {
private final Map<ConfigResource, KafkaFuture<Config>> futures;
DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {
protected DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {
this.futures = futures;
}

View File

@ -17,12 +17,19 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
@ -36,17 +43,23 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ClientUtils.QuietConsumerConfig;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class InternalTopicManager {
private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. " +
"Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
"Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. " + BUG_ERROR_MESSAGE;
private final Logger log;
@ -91,6 +104,255 @@ public class InternalTopicManager {
}
}
static class ValidationResult {
private final Set<String> missingTopics = new HashSet<>();
private final Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
public void addMissingTopic(final String topic) {
missingTopics.add(topic);
}
public Set<String> missingTopics() {
return Collections.unmodifiableSet(missingTopics);
}
public void addMisconfiguration(final String topic, final String message) {
misconfigurationsForTopics.computeIfAbsent(topic, ignored -> new ArrayList<>())
.add(message);
}
public Map<String, List<String>> misconfigurationsForTopics() {
return Collections.unmodifiableMap(misconfigurationsForTopics);
}
}
/**
* Validates the internal topics passed.
*
* The validation of the internal topics verifies if the topics:
* - are missing on the brokers
* - have the expected number of partitions
* - have configured a clean-up policy that avoids data loss
*
* @param topicConfigs internal topics to validate
*
* @return validation results that contains
* - the set of missing internal topics on the brokers
* - descriptions of misconfigurations per topic
*/
public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
log.info("Starting to validate internal topics {}.", topicConfigs.keySet());
final long now = time.milliseconds();
final long deadline = now + retryTimeoutMs;
final ValidationResult validationResult = new ValidationResult();
final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
if (!topicDescriptionsStillToValidate.isEmpty()) {
final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
descriptionsForTopic = describeTopicsResult.values();
}
Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
if (!topicConfigsStillToValidate.isEmpty()) {
final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
topicConfigsStillToValidate.stream()
.map(topic -> new ConfigResource(Type.TOPIC, topic))
.collect(Collectors.toSet())
);
configsForTopic = describeConfigsResult.values().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
}
while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
if (!descriptionsForTopic.isEmpty()) {
doValidateTopic(
validationResult,
descriptionsForTopic,
topicConfigs,
topicDescriptionsStillToValidate,
(streamsSide, brokerSide) -> validatePartitionCount(validationResult, streamsSide, brokerSide)
);
}
if (!configsForTopic.isEmpty()) {
doValidateTopic(
validationResult,
configsForTopic,
topicConfigs,
topicConfigsStillToValidate,
(streamsSide, brokerSide) -> validateCleanupPolicy(validationResult, streamsSide, brokerSide)
);
}
maybeThrowTimeoutException(topicDescriptionsStillToValidate, topicConfigsStillToValidate, deadline);
if (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
Utils.sleep(100);
}
}
maybeSleep(topicDescriptionsStillToValidate, topicConfigsStillToValidate, deadline);
}
log.info("Completed validation of internal topics {}.", topicConfigs.keySet());
return validationResult;
}
private <V> void doValidateTopic(final ValidationResult validationResult,
final Map<String, KafkaFuture<V>> futuresForTopic,
final Map<String, InternalTopicConfig> topicsConfigs,
final Set<String> topicsStillToValidate,
final BiConsumer<InternalTopicConfig, V> validator) {
for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
final String topicName = topicConfig.name();
if (!futuresForTopic.containsKey(topicName)) {
throw new IllegalStateException("Description results do not contain topics to validate. " + BUG_ERROR_MESSAGE);
}
final KafkaFuture<V> future = futuresForTopic.get(topicName);
if (future.isDone()) {
try {
final V brokerSideTopicConfig = future.get();
validator.accept(topicConfig, brokerSideTopicConfig);
topicsStillToValidate.remove(topicName);
} catch (final ExecutionException executionException) {
final Throwable cause = executionException.getCause();
if (cause instanceof UnknownTopicOrPartitionException) {
log.info("Internal topic {} is missing",
topicName);
validationResult.addMissingTopic(topicName);
topicsStillToValidate.remove(topicName);
} else if (cause instanceof LeaderNotAvailableException) {
log.info("The leader of internal topic {} is not available.", topicName);
} else if (cause instanceof TimeoutException) {
log.info("Retrieving data for internal topic {} timed out.", topicName);
} else {
log.error("Unexpected error during internal topic validation: ", cause);
throw new StreamsException(
String.format("Could not validate internal topic %s for the following reason: ", topicName),
cause
);
}
} catch (final InterruptedException interruptedException) {
throw new InterruptException(interruptedException);
} finally {
futuresForTopic.remove(topicName);
}
}
}
}
private void maybeThrowTimeoutException(final Set<String> topicDescriptionsStillToValidate,
final Set<String> topicConfigsStillToValidate,
final long deadline) {
if (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
final long now = time.milliseconds();
if (now >= deadline) {
final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
"This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
log.error(timeoutError);
throw new TimeoutException(timeoutError);
}
}
}
private void maybeSleep(final Set<String> topicDescriptionsStillToValidate,
final Set<String> topicConfigsStillToValidate,
final long deadline) {
if (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
final long now = time.milliseconds();
log.info(
"Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
new HashSet<>(topicDescriptionsStillToValidate).addAll(topicConfigsStillToValidate),
retryBackOffMs,
deadline - now
);
Utils.sleep(retryBackOffMs);
}
}
private void validatePartitionCount(final ValidationResult validationResult,
final InternalTopicConfig topicConfig,
final TopicDescription topicDescription) {
final String topicName = topicConfig.name();
final int requiredPartitionCount = topicConfig.numberOfPartitions()
.orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
topicName + ". " + BUG_ERROR_MESSAGE));
final int actualPartitionCount = topicDescription.partitions().size();
if (actualPartitionCount != requiredPartitionCount) {
validationResult.addMisconfiguration(
topicName,
"Internal topic " + topicName + " requires " + requiredPartitionCount + " partitions, " +
"but the existing topic on the broker has " + actualPartitionCount + " partitions."
);
}
}
private void validateCleanupPolicy(final ValidationResult validationResult,
final InternalTopicConfig topicConfig,
final Config brokerSideTopicConfig) {
if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
} else if (topicConfig instanceof WindowedChangelogTopicConfig) {
validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
}
}
private void validateCleanupPolicyForUnwindowedChangelogs(final ValidationResult validationResult,
final InternalTopicConfig topicConfig,
final Config brokerSideTopicConfig) {
final String topicName = topicConfig.name();
final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
validationResult.addMisconfiguration(
topicName,
"Cleanup policy of existing internal topic " + topicName + " should not contain \""
+ TopicConfig.CLEANUP_POLICY_DELETE + "\"."
);
}
}
private void validateCleanupPolicyForWindowedChangelogs(final ValidationResult validationResult,
final InternalTopicConfig topicConfig,
final Config brokerSideTopicConfig) {
final String topicName = topicConfig.name();
final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
final long brokerSideRetentionMs =
Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
final Map<String, String> streamsSideConfig =
topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention);
final long streamsSideRetentionMs = Long.parseLong(streamsSideConfig.get(TopicConfig.RETENTION_MS_CONFIG));
if (brokerSideRetentionMs < streamsSideRetentionMs) {
validationResult.addMisconfiguration(
topicName,
"Retention time of existing internal topic " + topicName + " is " + brokerSideRetentionMs +
" but should be " + streamsSideRetentionMs + " or larger."
);
}
final String brokerSideRetentionBytes =
getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_BYTES_CONFIG, topicName);
if (brokerSideRetentionBytes != null) {
validationResult.addMisconfiguration(
topicName,
"Retention byte of existing internal topic " + topicName + " is set but it should be unset."
);
}
}
}
private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
final String configName,
final String topicName) {
final ConfigEntry brokerSideConfigEntry = brokerSideTopicConfig.get(configName);
if (brokerSideConfigEntry == null) {
throw new IllegalStateException("The config " + configName + " for topic " +
topicName + " could not be " + "retrieved from the brokers. " + BUG_ERROR_MESSAGE);
}
return brokerSideConfigEntry.value();
}
/**
* Prepares a set of given internal topics.
*