KAFKA-10000: Add all public-facing config properties (#11775)

Reviewers: Luke Chen <showuon@gmail.com>, Tom Bentley <tbentley@redhat.com>, Andrew Eugene Choi <andrew.choi@uwaterloo.ca>
This commit is contained in:
Chris Egerton 2022-05-12 02:45:53 -04:00 committed by GitHub
parent 85cfa70f59
commit 7268284699
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 365 additions and 28 deletions

View File

@ -63,9 +63,8 @@ public abstract class SourceConnector extends Connector {
* *
* @param connectorConfig the configuration that will be used for the connector * @param connectorConfig the configuration that will be used for the connector
* @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the connector will define its own transaction boundaries, * @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the connector will define its own transaction boundaries,
* or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If this method is overridden by a * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise; may never be {@code null}. The default implementation
* connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot define its own * returns {@link ConnectorTransactionBoundaries#UNSUPPORTED}.
* transaction boundaries.
* @since 3.3 * @since 3.3
* @see TransactionContext * @see TransactionContext
*/ */

View File

@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.Task;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* SourceTask is a Task that pulls records from another system for storage in Kafka. * SourceTask is a Task that pulls records from another system for storage in Kafka.
@ -64,6 +65,7 @@ public abstract class SourceTask implements Task {
* @throws IllegalArgumentException if there is no transaction boundary type with the given name * @throws IllegalArgumentException if there is no transaction boundary type with the given name
*/ */
public static TransactionBoundary fromProperty(String property) { public static TransactionBoundary fromProperty(String property) {
Objects.requireNonNull(property, "Value for transaction boundary property may not be null");
return TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim()); return TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());
} }

View File

@ -20,21 +20,31 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.source.SourceTask;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.DEFAULT;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
import static org.apache.kafka.common.utils.Utils.enumOptions;
public class SourceConnectorConfig extends ConnectorConfig { public class SourceConnectorConfig extends ConnectorConfig {
@ -47,6 +57,57 @@ public class SourceConnectorConfig extends ConnectorConfig {
+ "created by source connectors"; + "created by source connectors";
private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups"; private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
protected static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once Support";
public enum ExactlyOnceSupportLevel {
REQUESTED,
REQUIRED;
public static ExactlyOnceSupportLevel fromProperty(String property) {
return valueOf(property.toUpperCase(Locale.ROOT).trim());
}
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
public static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
private static final String EXACTLY_ONCE_SUPPORT_DOC = "Permitted values are " + String.join(", ", enumOptions(ExactlyOnceSupportLevel.class)) + ". "
+ "If set to \"" + REQUIRED + "\", forces a preflight check for the connector to ensure that it can provide exactly-once delivery "
+ "with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to "
+ "Connect that they support this; in that case, documentation for the connector should be consulted carefully before "
+ "creating it, and the value for this property should be set to \"" + REQUESTED + "\". "
+ "Additionally, if the value is set to \"" + REQUIRED + "\" but the worker that performs preflight validation does not have "
+ "exactly-once support enabled for source connectors, requests to create or validate the connector will fail.";
private static final String EXACTLY_ONCE_SUPPORT_DISPLAY = "Exactly once support";
public static final String TRANSACTION_BOUNDARY_CONFIG = SourceTask.TRANSACTION_BOUNDARY_CONFIG;
private static final String TRANSACTION_BOUNDARY_DOC = "Permitted values are: " + String.join(", ", enumOptions(TransactionBoundary.class)) + ". "
+ "If set to '" + POLL + "', a new producer transaction will be started and committed for every batch of records that each task from "
+ "this connector provides to Connect. If set to '" + CONNECTOR + "', relies on connector-defined transaction boundaries; note that "
+ "not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to instantiate a connector with "
+ "this value will fail. Finally, if set to '" + INTERVAL + "', commits transactions only after a user-defined time interval has passed.";
private static final String TRANSACTION_BOUNDARY_DISPLAY = "Transaction Boundary";
public static final String TRANSACTION_BOUNDARY_INTERVAL_CONFIG = "transaction.boundary.interval.ms";
private static final String TRANSACTION_BOUNDARY_INTERVAL_DOC = "If '" + TRANSACTION_BOUNDARY_CONFIG + "' is set to '" + INTERVAL
+ "', determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level "
+ "'" + WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG + "' property. It has no effect if a different "
+ TRANSACTION_BOUNDARY_CONFIG + " is specified.";
private static final String TRANSACTION_BOUNDARY_INTERVAL_DISPLAY = "Transaction boundary interval";
protected static final String OFFSETS_TOPIC_GROUP = "offsets.topic";
public static final String OFFSETS_TOPIC_CONFIG = "offsets.storage.topic";
private static final String OFFSETS_TOPIC_DOC = "The name of a separate offsets topic to use for this connector. "
+ "If empty or not specified, the workers global offsets topic name will be used. "
+ "If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector "
+ "(which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer "
+ "has been overridden from the worker's). Only applicable in distributed mode; in standalone mode, setting this property will have no effect.";
private static final String OFFSETS_TOPIC_DISPLAY = "Offsets topic";
private static class EnrichedSourceConnectorConfig extends ConnectorConfig { private static class EnrichedSourceConnectorConfig extends ConnectorConfig {
EnrichedSourceConnectorConfig(Plugins plugins, ConfigDef configDef, Map<String, String> props) { EnrichedSourceConnectorConfig(Plugins plugins, ConfigDef configDef, Map<String, String> props) {
super(plugins, configDef, props); super(plugins, configDef, props);
@ -58,23 +119,87 @@ public class SourceConnectorConfig extends ConnectorConfig {
} }
} }
private static final ConfigDef CONFIG = SourceConnectorConfig.configDef(); private final TransactionBoundary transactionBoundary;
private final Long transactionBoundaryInterval;
private final EnrichedSourceConnectorConfig enrichedSourceConfig; private final EnrichedSourceConnectorConfig enrichedSourceConfig;
private final String offsetsTopic;
public static ConfigDef configDef() { public static ConfigDef configDef() {
ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
int orderInGroup = 0; int orderInGroup = 0;
return new ConfigDef(ConnectorConfig.configDef()) return new ConfigDef(ConnectorConfig.configDef())
.define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), .define(
ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with( TOPIC_CREATION_GROUPS_CONFIG,
ConfigDef.Type.LIST,
Collections.emptyList(),
ConfigDef.CompositeValidator.of(
new ConfigDef.NonNullValidator(),
ConfigDef.LambdaValidator.with(
(name, value) -> {
List<?> groupAliases = (List<?>) value;
if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
throw new ConfigException(name, value, "Duplicate alias provided.");
}
},
() -> "unique topic creation groups")),
ConfigDef.Importance.LOW,
TOPIC_CREATION_GROUPS_DOC,
TOPIC_CREATION_GROUP,
++orderInGroup,
ConfigDef.Width.LONG,
TOPIC_CREATION_GROUPS_DISPLAY)
.define(
EXACTLY_ONCE_SUPPORT_CONFIG,
ConfigDef.Type.STRING,
REQUESTED.toString(),
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSupportLevel.class)),
ConfigDef.Importance.MEDIUM,
EXACTLY_ONCE_SUPPORT_DOC,
EXACTLY_ONCE_SUPPORT_GROUP,
++orderInGroup,
ConfigDef.Width.SHORT,
EXACTLY_ONCE_SUPPORT_DISPLAY)
.define(
TRANSACTION_BOUNDARY_CONFIG,
ConfigDef.Type.STRING,
DEFAULT.toString(),
ConfigDef.CaseInsensitiveValidString.in(enumOptions(TransactionBoundary.class)),
ConfigDef.Importance.MEDIUM,
TRANSACTION_BOUNDARY_DOC,
EXACTLY_ONCE_SUPPORT_GROUP,
++orderInGroup,
ConfigDef.Width.SHORT,
TRANSACTION_BOUNDARY_DISPLAY)
.define(
TRANSACTION_BOUNDARY_INTERVAL_CONFIG,
ConfigDef.Type.LONG,
null,
ConfigDef.LambdaValidator.with(
(name, value) -> { (name, value) -> {
List<?> groupAliases = (List<?>) value; if (value == null) {
if (groupAliases.size() > new HashSet<>(groupAliases).size()) { return;
throw new ConfigException(name, value, "Duplicate alias provided.");
} }
atLeastZero.ensureValid(name, value);
}, },
() -> "unique topic creation groups")), atLeastZero::toString
ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP, ),
++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY); ConfigDef.Importance.LOW,
TRANSACTION_BOUNDARY_INTERVAL_DOC,
EXACTLY_ONCE_SUPPORT_GROUP,
++orderInGroup,
ConfigDef.Width.SHORT,
TRANSACTION_BOUNDARY_INTERVAL_DISPLAY)
.define(
OFFSETS_TOPIC_CONFIG,
ConfigDef.Type.STRING,
null,
new ConfigDef.NonEmptyString(),
ConfigDef.Importance.LOW,
OFFSETS_TOPIC_DOC,
OFFSETS_TOPIC_GROUP,
orderInGroup = 1,
ConfigDef.Width.LONG,
OFFSETS_TOPIC_DISPLAY);
} }
public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) { public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
@ -116,9 +241,9 @@ public class SourceConnectorConfig extends ConnectorConfig {
} }
public SourceConnectorConfig(Plugins plugins, Map<String, String> props, boolean createTopics) { public SourceConnectorConfig(Plugins plugins, Map<String, String> props, boolean createTopics) {
super(plugins, CONFIG, props); super(plugins, configDef(), props);
if (createTopics && props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX))) { if (createTopics && props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
ConfigDef defaultConfigDef = embedDefaultGroup(CONFIG); ConfigDef defaultConfigDef = embedDefaultGroup(configDef());
// This config is only used to set default values for partitions and replication // This config is only used to set default values for partitions and replication
// factor from the default group and otherwise it remains unused // factor from the default group and otherwise it remains unused
AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, props, false); AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, props, false);
@ -135,6 +260,13 @@ public class SourceConnectorConfig extends ConnectorConfig {
} else { } else {
enrichedSourceConfig = null; enrichedSourceConfig = null;
} }
transactionBoundary = TransactionBoundary.fromProperty(getString(TRANSACTION_BOUNDARY_CONFIG));
transactionBoundaryInterval = getLong(TRANSACTION_BOUNDARY_INTERVAL_CONFIG);
offsetsTopic = getString(OFFSETS_TOPIC_CONFIG);
}
public static boolean usesTopicCreation(Map<String, String> props) {
return props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX));
} }
@Override @Override
@ -142,6 +274,18 @@ public class SourceConnectorConfig extends ConnectorConfig {
return enrichedSourceConfig != null ? enrichedSourceConfig.get(key) : super.get(key); return enrichedSourceConfig != null ? enrichedSourceConfig.get(key) : super.get(key);
} }
public TransactionBoundary transactionBoundary() {
return transactionBoundary;
}
public Long transactionBoundaryInterval() {
return transactionBoundaryInterval;
}
public String offsetsTopic() {
return offsetsTopic;
}
/** /**
* Returns whether this configuration uses topic creation properties. * Returns whether this configuration uses topic creation properties.
* *
@ -181,6 +325,6 @@ public class SourceConnectorConfig extends ConnectorConfig {
} }
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "sourceconnectorconfigs_" + config)); System.out.println(configDef().toHtml(4, config -> "sourceconnectorconfigs_" + config));
} }
} }

View File

@ -111,7 +111,8 @@ public class WorkerConfig extends AbstractConfig {
private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
= "Maximum number of milliseconds to wait for records to flush and partition offset data to be" = "Maximum number of milliseconds to wait for records to flush and partition offset data to be"
+ " committed to offset storage before cancelling the process and restoring the offset " + " committed to offset storage before cancelling the process and restoring the offset "
+ "data to be committed in a future attempt."; + "data to be committed in a future attempt. This property has no effect for source connectors "
+ "running with exactly-once support.";
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L; public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
public static final String LISTENERS_CONFIG = "listeners"; public static final String LISTENERS_CONFIG = "listeners";
@ -343,6 +344,15 @@ public class WorkerConfig extends AbstractConfig {
} }
} }
/**
* @return the {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG bootstrap servers} property
* used by the worker when instantiating Kafka clients for connectors and tasks (unless overridden)
* and its internal topics (if running in distributed mode)
*/
public String bootstrapServers() {
return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG));
}
public Integer getRebalanceTimeout() { public Integer getRebalanceTimeout() {
return null; return null;
} }
@ -351,6 +361,54 @@ public class WorkerConfig extends AbstractConfig {
return getBoolean(TOPIC_CREATION_ENABLE_CONFIG); return getBoolean(TOPIC_CREATION_ENABLE_CONFIG);
} }
/**
* Whether this worker is configured with exactly-once support for source connectors.
* The default implementation returns {@code false} and should be overridden by subclasses
* if the worker mode for the subclass provides exactly-once support for source connectors.
* @return whether exactly-once support is enabled for source connectors on this worker
*/
public boolean exactlyOnceSourceEnabled() {
return false;
}
/**
* Get the internal topic used by this worker to store source connector offsets.
* The default implementation returns {@code null} and should be overridden by subclasses
* if the worker mode for the subclass uses an internal offsets topic.
* @return the name of the internal offsets topic, or {@code null} if the worker does not use
* an internal offsets topic
*/
public String offsetsTopic() {
return null;
}
/**
* Determine whether this worker supports per-connector source offsets topics.
* The default implementation returns {@code false} and should be overridden by subclasses
* if the worker mode for the subclass supports per-connector offsets topics.
* @return whether the worker supports per-connector offsets topics
*/
public boolean connectorOffsetsTopicsPermitted() {
return false;
}
/**
* @return the offset commit interval for tasks created by this worker
*/
public long offsetCommitInterval() {
return getLong(OFFSET_COMMIT_INTERVAL_MS_CONFIG);
}
/**
* Get the {@link CommonClientConfigs#GROUP_ID_CONFIG group ID} used by this worker to form a cluster.
* The default implementation returns {@code null} and should be overridden by subclasses
* if the worker mode for the subclass is capable of forming a cluster using Kafka's group coordination API.
* @return the group ID for the worker's cluster, or {@code null} if the worker is not capable of forming a cluster.
*/
public String groupId() {
return null;
}
@Override @Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed; package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.TopicConfig;
@ -32,6 +33,7 @@ import java.security.InvalidParameterException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -39,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.utils.Utils.enumOptions;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
@ -194,6 +197,34 @@ public class DistributedConfig extends WorkerConfig {
public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests"; public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests";
public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT); public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
private enum ExactlyOnceSourceSupport {
DISABLED(false),
PREPARING(true),
ENABLED(true);
public final boolean usesTransactionalLeader;
ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
this.usesTransactionalLeader = usesTransactionalLeader;
}
public static ExactlyOnceSourceSupport fromProperty(String property) {
return ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
}
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support";
public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster "
+ "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones. ";
// TODO: https://issues.apache.org/jira/browse/KAFKA-13709
// + "See the exactly-once source support documentation at [add docs link here] for more information on this feature.";
public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static final ConfigDef CONFIG = baseConfigDef() private static final ConfigDef CONFIG = baseConfigDef()
.define(GROUP_ID_CONFIG, .define(GROUP_ID_CONFIG,
@ -215,6 +246,12 @@ public class DistributedConfig extends WorkerConfig {
Math.toIntExact(TimeUnit.SECONDS.toMillis(3)), Math.toIntExact(TimeUnit.SECONDS.toMillis(3)),
ConfigDef.Importance.HIGH, ConfigDef.Importance.HIGH,
HEARTBEAT_INTERVAL_MS_DOC) HEARTBEAT_INTERVAL_MS_DOC)
.define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
ConfigDef.Type.STRING,
EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
ConfigDef.Importance.HIGH,
EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
.define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
ConfigDef.Type.LONG, ConfigDef.Type.LONG,
TimeUnit.MINUTES.toMillis(5), TimeUnit.MINUTES.toMillis(5),
@ -399,13 +436,57 @@ public class DistributedConfig extends WorkerConfig {
ConfigDef.Importance.LOW, ConfigDef.Importance.LOW,
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC); INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
@Override @Override
public Integer getRebalanceTimeout() { public Integer getRebalanceTimeout() {
return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG); return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
} }
@Override
public boolean exactlyOnceSourceEnabled() {
return exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED;
}
/**
* @return whether the Connect cluster's leader should use a transactional producer to perform writes to the config
* topic, which is useful for ensuring that zombie leaders are fenced out and unable to write to the topic after a
* new leader has been elected.
*/
public boolean transactionalLeaderEnabled() {
return exactlyOnceSourceSupport.usesTransactionalLeader;
}
/**
* @return the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG transactional ID} to use for the worker's producer if
* using a transactional producer for writes to internal topics such as the config topic.
*/
public String transactionalProducerId() {
return transactionalProducerId(groupId());
}
public static String transactionalProducerId(String groupId) {
return "connect-cluster-" + groupId;
}
@Override
public String offsetsTopic() {
return getString(OFFSET_STORAGE_TOPIC_CONFIG);
}
@Override
public boolean connectorOffsetsTopicsPermitted() {
return true;
}
@Override
public String groupId() {
return getString(GROUP_ID_CONFIG);
}
public DistributedConfig(Map<String, String> props) { public DistributedConfig(Map<String, String> props) {
super(CONFIG, props); super(CONFIG, props);
exactlyOnceSourceSupport = ExactlyOnceSourceSupport.fromProperty(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG));
getInternalRequestKeyGenerator(); // Check here for a valid key size + key algorithm to fail fast if either are invalid getInternalRequestKeyGenerator(); // Check here for a valid key size + key algorithm to fail fast if either are invalid
validateKeyAlgorithmAndVerificationAlgorithms(); validateKeyAlgorithmAndVerificationAlgorithms();
} }

View File

@ -53,6 +53,7 @@ import org.easymock.EasyMock;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.api.easymock.annotation.MockStrict; import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
@ -142,10 +143,10 @@ public class AbstractHerderTest {
@MockStrict private Worker worker; @MockStrict private Worker worker;
@MockStrict private WorkerConfigTransformer transformer; @MockStrict private WorkerConfigTransformer transformer;
@MockStrict private Plugins plugins;
@MockStrict private ClassLoader classLoader;
@MockStrict private ConfigBackingStore configStore; @MockStrict private ConfigBackingStore configStore;
@MockStrict private StatusBackingStore statusStore; @MockStrict private StatusBackingStore statusStore;
@MockStrict private ClassLoader classLoader;
@Mock private Plugins plugins;
@Test @Test
public void testConnectors() { public void testConnectors() {
@ -436,13 +437,18 @@ public class AbstractHerderTest {
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely. // the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(SampleSourceConnector.class.getName(), result.name()); assertEquals(SampleSourceConnector.class.getName(), result.name());
assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP, assertEquals(
ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP, SourceConnectorConfig.TOPIC_CREATION_GROUP), result.groups()); Arrays.asList(
ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP),
result.groups());
assertEquals(2, result.errorCount()); assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream() Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
// Base connector config has 14 fields, connector's configs add 2 // Base connector config has 14 fields, connector's configs add 7
assertEquals(17, infos.size()); assertEquals(21, infos.size());
// Missing name should generate an error // Missing name should generate an error
assertEquals(ConnectorConfig.NAME_CONFIG, assertEquals(ConnectorConfig.NAME_CONFIG,
infos.get(ConnectorConfig.NAME_CONFIG).configValue().name()); infos.get(ConnectorConfig.NAME_CONFIG).configValue().name());
@ -531,6 +537,8 @@ public class AbstractHerderTest {
ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP, ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.TOPIC_CREATION_GROUP,
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
"Transforms: xformA", "Transforms: xformA",
"Transforms: xformB" "Transforms: xformB"
); );
@ -538,7 +546,7 @@ public class AbstractHerderTest {
assertEquals(2, result.errorCount()); assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream() Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(22, infos.size()); assertEquals(26, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class // Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name()); infos.get("transforms.xformA.type").configValue().name());
@ -590,6 +598,8 @@ public class AbstractHerderTest {
ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP, ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.TOPIC_CREATION_GROUP,
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
"Transforms: xformA", "Transforms: xformA",
"Predicates: predX", "Predicates: predX",
"Predicates: predY" "Predicates: predY"
@ -598,7 +608,7 @@ public class AbstractHerderTest {
assertEquals(2, result.errorCount()); assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream() Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(24, infos.size()); assertEquals(28, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class // Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name()); infos.get("transforms.xformA.type").configValue().name());
@ -659,12 +669,14 @@ public class AbstractHerderTest {
ConnectorConfig.TRANSFORMS_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP, ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP SourceConnectorConfig.TOPIC_CREATION_GROUP,
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP
); );
assertEquals(expectedGroups, result.groups()); assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount()); assertEquals(1, result.errorCount());
// Base connector config has 14 fields, connector's configs add 2, and 2 producer overrides // Base connector config has 14 fields, connector's configs add 7, and 2 producer overrides
assertEquals(19, result.values().size()); assertEquals(23, result.values().size());
assertTrue(result.values().stream().anyMatch( assertTrue(result.values().stream().anyMatch(
configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty())); configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
assertTrue(result.values().stream().anyMatch( assertTrue(result.values().stream().anyMatch(

View File

@ -28,11 +28,14 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.Assert.assertTrue;
public class DistributedConfigTest { public class DistributedConfigTest {
@ -306,4 +309,42 @@ public class DistributedConfigTest {
() -> new DistributedConfig(configs)); () -> new DistributedConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
} }
@Test
public void shouldIdentifyNeedForTransactionalLeader() {
Map<String, String> workerProps = configs();
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
assertFalse(new DistributedConfig(workerProps).transactionalLeaderEnabled());
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
assertTrue(new DistributedConfig(workerProps).transactionalLeaderEnabled());
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
assertTrue(new DistributedConfig(workerProps).transactionalLeaderEnabled());
}
@Test
public void shouldConstructExpectedTransactionalId() {
Map<String, String> workerProps = configs();
workerProps.put(GROUP_ID_CONFIG, "why did i stay up all night writing unit tests");
assertEquals(
"connect-cluster-why did i stay up all night writing unit tests",
new DistributedConfig(workerProps).transactionalProducerId()
);
workerProps.put(GROUP_ID_CONFIG, "connect-cluster");
assertEquals(
"connect-cluster-connect-cluster",
new DistributedConfig(workerProps).transactionalProducerId()
);
workerProps.put(GROUP_ID_CONFIG, "\u2603");
assertEquals(
"connect-cluster-\u2603",
new DistributedConfig(workerProps).transactionalProducerId()
);
}
} }