mirror of https://github.com/apache/kafka.git
KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (#13106)
This commit is contained in:
parent
a2926edc2f
commit
a382acd31d
|
@ -17,15 +17,15 @@
|
|||
package org.apache.kafka.connect.source;
|
||||
|
||||
/**
|
||||
* An enum to represent the level of support for exactly-once delivery from a source connector.
|
||||
* An enum to represent the level of support for exactly-once semantics from a source connector.
|
||||
*/
|
||||
public enum ExactlyOnceSupport {
|
||||
/**
|
||||
* Signals that a connector supports exactly-once delivery.
|
||||
* Signals that a connector supports exactly-once semantics.
|
||||
*/
|
||||
SUPPORTED,
|
||||
/**
|
||||
* Signals that a connector does not support exactly-once delivery.
|
||||
* Signals that a connector does not support exactly-once semantics.
|
||||
*/
|
||||
UNSUPPORTED
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ public abstract class SourceConnector extends Connector {
|
|||
}
|
||||
|
||||
/**
|
||||
* Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
|
||||
* Signals whether the connector supports exactly-once semantics with a proposed configuration.
|
||||
* Connector authors can assume that worker-level exactly-once support is enabled when this method is invoked.
|
||||
*
|
||||
* <p>For backwards compatibility, the default implementation will return {@code null}, but connector authors are
|
||||
|
@ -46,7 +46,7 @@ public abstract class SourceConnector extends Connector {
|
|||
* @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
|
||||
* configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
|
||||
* connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
|
||||
* exactly-once guarantees.
|
||||
* exactly-once semantics.
|
||||
* @since 3.3
|
||||
*/
|
||||
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
|
||||
|
|
|
@ -92,9 +92,9 @@ public class FileStreamSourceConnector extends SourceConnector {
|
|||
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
|
||||
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
|
||||
String filename = parsedConfig.getString(FILE_CONFIG);
|
||||
// We can provide exactly-once guarantees if reading from a "real" file
|
||||
// We can provide exactly-once semantics if reading from a "real" file
|
||||
// (as long as the file is only appended to over the lifetime of the connector)
|
||||
// If we're reading from stdin, we can't provide exactly-once guarantees
|
||||
// If we're reading from stdin, we can't provide exactly-once semantics
|
||||
// since we don't even track offsets
|
||||
return filename != null && !filename.isEmpty()
|
||||
? ExactlyOnceSupport.SUPPORTED
|
||||
|
|
|
@ -69,7 +69,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
|
||||
|
||||
/**
|
||||
* WorkerTask that contains shared logic for running source tasks with either standard or exactly-once delivery guarantees.
|
||||
* WorkerTask that contains shared logic for running source tasks with either standard semantics
|
||||
* (i.e., either at-least-once or at-most-once) or exactly-once semantics.
|
||||
*/
|
||||
public abstract class AbstractWorkerSourceTask extends WorkerTask {
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractWorkerSourceTask.class);
|
||||
|
|
|
@ -55,7 +55,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
|
||||
/**
|
||||
* WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once delivery guarantees.
|
||||
* WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once semantics.
|
||||
*/
|
||||
class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
|
||||
private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
|
||||
|
|
|
@ -79,8 +79,8 @@ public class SourceConnectorConfig extends ConnectorConfig {
|
|||
|
||||
public static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
|
||||
private static final String EXACTLY_ONCE_SUPPORT_DOC = "Permitted values are " + String.join(", ", enumOptions(ExactlyOnceSupportLevel.class)) + ". "
|
||||
+ "If set to \"" + REQUIRED + "\", forces a preflight check for the connector to ensure that it can provide exactly-once delivery "
|
||||
+ "with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to "
|
||||
+ "If set to \"" + REQUIRED + "\", forces a preflight check for the connector to ensure that it can provide exactly-once semantics "
|
||||
+ "with the given configuration. Some connectors may be capable of providing exactly-once semantics but not signal to "
|
||||
+ "Connect that they support this; in that case, documentation for the connector should be consulted carefully before "
|
||||
+ "creating it, and the value for this property should be set to \"" + REQUESTED + "\". "
|
||||
+ "Additionally, if the value is set to \"" + REQUIRED + "\" but the worker that performs preflight validation does not have "
|
||||
|
|
|
@ -822,7 +822,7 @@ public class Worker {
|
|||
Map<String, Object> result = baseConsumerConfigs(
|
||||
connName, defaultClientId, config, connConfig, connectorClass,
|
||||
connectorClientConfigOverridePolicy, clusterId, ConnectorType.SOURCE);
|
||||
// Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways
|
||||
// Users can disable this if they want to since the task isn't exactly-once anyways
|
||||
result.putIfAbsent(
|
||||
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
|
||||
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
|
||||
|
|
|
@ -942,18 +942,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
|||
if (exactlyOnceSupport == null) {
|
||||
validationErrorMessage = "The connector does not implement the API required for preflight validation of exactly-once "
|
||||
+ "source support. Please consult the documentation for the connector to determine whether it supports exactly-once "
|
||||
+ "guarantees, and then consider reconfiguring the connector to use the value \""
|
||||
+ "semantics, and then consider reconfiguring the connector to use the value \""
|
||||
+ SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED
|
||||
+ "\" for this property (which will disable this preflight check and allow the connector to be created).";
|
||||
} else if (ExactlyOnceSupport.UNSUPPORTED.equals(exactlyOnceSupport)) {
|
||||
validationErrorMessage = "The connector does not support exactly-once delivery guarantees with the provided configuration.";
|
||||
validationErrorMessage = "The connector does not support exactly-once semantics with the provided configuration.";
|
||||
} else {
|
||||
throw new ConnectException("Unexpected value returned from SourceConnector::exactlyOnceSupport: " + exactlyOnceSupport);
|
||||
}
|
||||
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed while validating connector support for exactly-once guarantees", e);
|
||||
log.error("Failed while validating connector support for exactly-once semantics", e);
|
||||
String validationErrorMessage = "An unexpected error occurred during validation";
|
||||
String failureMessage = e.getMessage();
|
||||
if (failureMessage != null && !failureMessage.trim().isEmpty()) {
|
||||
|
|
|
@ -159,7 +159,7 @@ public class ExactlyOnceSourceIntegrationTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* A simple test for the pre-flight validation API for connectors to provide their own delivery guarantees.
|
||||
* A simple test for the pre-flight validation API for connectors to provide their own guarantees for exactly-once semantics.
|
||||
*/
|
||||
@Test
|
||||
public void testPreflightValidation() {
|
||||
|
@ -727,8 +727,8 @@ public class ExactlyOnceSourceIntegrationTest {
|
|||
* Then, a "soft downgrade" is simulated: the Connect cluster is shut down and reconfigured to disable
|
||||
* exactly-once support. The cluster is brought up again, the connector is allowed to produce some data,
|
||||
* the connector is shut down, and this time, the records the connector has produced are inspected for
|
||||
* accuracy. Because of the downgrade, exactly-once guarantees are lost, but we check to make sure that
|
||||
* the task has maintained exactly-once delivery <i>up to the last-committed record</i>.
|
||||
* accuracy. Because of the downgrade, exactly-once semantics are lost, but we check to make sure that
|
||||
* the task has maintained exactly-once semantics <i>up to the last-committed record</i>.
|
||||
*/
|
||||
@Test
|
||||
public void testSeparateOffsetsTopic() throws Exception {
|
||||
|
@ -858,7 +858,7 @@ public class ExactlyOnceSourceIntegrationTest {
|
|||
);
|
||||
assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
|
||||
sourceRecords.count() >= recordsProduced);
|
||||
// also have to check which offsets have actually been committed, since we no longer have exactly-once guarantees
|
||||
// also have to check which offsets have actually been committed, since we no longer have exactly-once semantics
|
||||
offsetRecords = connectorTargetedCluster.consumeAll(
|
||||
CONSUME_RECORDS_TIMEOUT_MS,
|
||||
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
|
||||
|
|
|
@ -855,7 +855,7 @@ public class DistributedHerderTest {
|
|||
|
||||
List<String> errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
|
||||
assertEquals(
|
||||
Collections.singletonList("The connector does not support exactly-once delivery guarantees with the provided configuration."),
|
||||
Collections.singletonList("The connector does not support exactly-once semantics with the provided configuration."),
|
||||
errors);
|
||||
|
||||
PowerMock.verifyAll();
|
||||
|
|
|
@ -371,15 +371,15 @@ errors.tolerance=all</pre>
|
|||
|
||||
<h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
|
||||
|
||||
<p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
|
||||
<p>Kafka Connect is capable of providing exactly-once semantics for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once semantics is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
|
||||
|
||||
<h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
|
||||
|
||||
<p>If a sink connector supports exactly-once delivery, to enable exactly-once delivery at the Connect worker level, you must ensure its consumer group is configured to ignore records in aborted transactions. You can do this by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
|
||||
<p>If a sink connector supports exactly-once semantics, to enable exactly-once at the Connect worker level, you must ensure its consumer group is configured to ignore records in aborted transactions. You can do this by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
|
||||
|
||||
<h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5>
|
||||
|
||||
<p>If a source connector supports exactly-once delivery, you must configure your Connect cluster to enable framework-level support for exactly-once delivery for source connectors. Additional ACLs may be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>
|
||||
<p>If a source connector supports exactly-once semantics, you must configure your Connect cluster to enable framework-level support for exactly-once source connectors. Additional ACLs may be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once semantics.</p>
|
||||
|
||||
<h6>Worker configuration</h6>
|
||||
|
||||
|
@ -770,24 +770,24 @@ public List<SourceRecord> poll() {
|
|||
|
||||
<p>A few additional preflight validation APIs can be implemented by source connector developers.</p>
|
||||
|
||||
<p>Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p>
|
||||
<p>Some users may require exactly-once semantics from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once semantics with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p>
|
||||
|
||||
<p>If a connector doesn't support exactly-once delivery, it should still implement this method to let users know for certain that it cannot provide exactly-once delivery guarantees:</p>
|
||||
<p>If a connector doesn't support exactly-once semantics, it should still implement this method to let users know for certain that it cannot provide exactly-once semantics:</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
@Override
|
||||
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
|
||||
// This connector cannot provide exactly-once delivery guarantees under any conditions
|
||||
// This connector cannot provide exactly-once semantics under any conditions
|
||||
return ExactlyOnceSupport.UNSUPPORTED;
|
||||
}
|
||||
</pre>
|
||||
|
||||
<p>Otherwise, a connector should examine the configuration, and return <code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once delivery guarantees:</p>
|
||||
<p>Otherwise, a connector should examine the configuration, and return <code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once semantics:</p>
|
||||
|
||||
<pre class="brush: java;">
|
||||
@Override
|
||||
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
|
||||
// This connector can always provide exactly-once delivery guarantees
|
||||
// This connector can always provide exactly-once semantics
|
||||
return ExactlyOnceSupport.SUPPORTED;
|
||||
}
|
||||
</pre>
|
||||
|
|
Loading…
Reference in New Issue