diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
index 98b09a38d36..d60eea25710 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
@@ -26,7 +26,7 @@ package org.apache.kafka.common.utils;
* This class also ensures monotonic updates to the timer even if the underlying clock is subject
* to non-monotonic behavior. For example, the remaining time returned by {@link #remainingMs()} is
* guaranteed to decrease monotonically until it hits zero.
- *
+ *
* Note that it is up to the caller to ensure progress of the timer using one of the
* {@link #update()} methods or {@link #sleep(long)}. The timer will cache the current time and
* return it indefinitely until the timer has been updated. This allows the caller to limit
@@ -34,14 +34,14 @@ package org.apache.kafka.common.utils;
* waiting a request sent through the {@link org.apache.kafka.clients.NetworkClient} should call
* {@link #update()} following each blocking call to
* {@link org.apache.kafka.clients.NetworkClient#poll(long, long)}.
- *
+ *
* A typical usage might look something like this:
*
*
* Time time = Time.SYSTEM;
* Timer timer = time.timer(500);
*
- * while (!conditionSatisfied() && timer.notExpired) {
+ * while (!conditionSatisfied() && timer.notExpired()) {
* client.poll(timer.remainingMs(), timer.currentTimeMs());
* timer.update();
* }
@@ -137,7 +137,7 @@ public class Timer {
/**
* Update the cached current time to a specific value. In some contexts, the caller may already
* have an accurate time, so this avoids unnecessary calls to system time.
- *
+ *
* Note that if the updated current time is smaller than the cached time, then the update
* is ignored.
*
@@ -161,7 +161,7 @@ public class Timer {
/**
* Get the current time in milliseconds. This will return the same cached value until the timer
* has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used.
- *
+ *
* Note that the value returned is guaranteed to increase monotonically even if the underlying
* {@link Time} implementation goes backwards. Effectively, the timer will just wait for the
* time to catch up.
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 86f79e8cbf1..f4a85eb59dc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1065,7 +1065,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.
ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
- // validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG
connectorType(config));
callback.onCompletion(null, new Created<>(!exists, info));
return null;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index bceb73e1715..e9f87697308 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
@@ -33,6 +34,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -40,11 +42,13 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
@@ -70,6 +74,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@@ -79,26 +84,45 @@ import java.util.function.Supplier;
* Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
*
*
- * This class manages both connector and task configurations. It tracks three types of configuration entries:
+ * This class manages both connector and task configurations, among other various configurations. It tracks seven types
+ * of records:
*
- * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
+ *
+ * - Connector config: map of string -> string configurations passed to the Connector class, with support for
* expanding this format if necessary. (Kafka key: connector-[connector-id]).
* These configs are *not* ephemeral. They represent the source of truth. If the entire Connect
- * cluster goes down, this is all that is really needed to recover.
- * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
+ * cluster goes down, this is all that is really needed to recover.
+ * - Task configs: map of string -> string configurations passed to the Task class, with support for expanding
* this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
- * These configs are ephemeral; they are stored here to a) disseminate them to all workers while
- * ensuring agreement and b) to allow faster cluster/worker recovery since the common case
- * of recovery (restoring a connector) will simply result in the same configuration as before
- * the failure.
- * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
+ * These configs are ephemeral; they are stored here to
+ *
+ * - disseminate them to all workers while ensuring agreement
+ * - to allow faster cluster/worker recovery since the common case of recovery (restoring a connector) will simply
+ * result in the same task configuration as before the failure.
+ *
+ *
+ * - Task commit "configs": records indicating that previous task config entries should be committed and all task
* configs for a connector can be applied. (Kafka key: commit-[connector-id].
* This config has two effects. First, it records the number of tasks the connector is currently
* running (and can therefore increase/decrease parallelism). Second, because each task config
* is stored separately but they need to be applied together to ensure each partition is assigned
* to a single task, this record also indicates that task configs for the specified connector
- * can be "applied" or "committed".
- *
+ * can be "applied" or "committed".
+ * - Connector target states: records indicating the {@link TargetState} for a connector
+ * - {@link RestartRequest Restart requests}: records representing requests to restart a connector and / or its
+ * tasks. See KIP-745 for more
+ * details.
+ * - Task count records: an integer value that that tracks the number of task producers (for source connectors) that
+ * will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task
+ * configurations. This is required for exactly-once support for source connectors, see
+ * KIP-618
+ * for more details.
+ * - Session key records: A {@link SessionKey} generated by the leader of the cluster when
+ * {@link ConnectProtocolCompatibility#SESSIONED} is the Connect protocol being used by the cluster. This session key
+ * is used to verify internal REST requests between workers in the cluster. See
+ * KIP-507
+ * for more details.
+ *
*
* This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
* ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
@@ -148,7 +172,7 @@ import java.util.function.Supplier;
* Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
* (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
* configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
- * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
+ * This allows users of this class (i.e., {@link Herder} implementations) to take action to resolve any inconsistencies. These
* inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
* of updating task configurations).
*
@@ -241,7 +265,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
.field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
.build();
- private static final long READ_TO_END_TIMEOUT_MS = 30000;
+ // Visible for testing
+ static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
private final Object lock;
private final Converter converter;
@@ -289,6 +314,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private final boolean usesFencableWriter;
private volatile Producer fencableProducer;
private final Map fencableProducerProps;
+ private final Time time;
@Deprecated
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
@@ -296,6 +322,10 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier, String clientIdBase) {
+ this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
+ }
+
+ KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier, String clientIdBase, Time time) {
this.lock = new Object();
this.started = false;
this.converter = converter;
@@ -320,6 +350,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
configLog = setupAndCreateKafkaBasedLog(this.topic, config);
this.configTransformer = configTransformer;
+ this.time = time;
}
@Override
@@ -469,8 +500,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
connectConfig.put("properties", properties);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
try {
- sendPrivileged(CONNECTOR_KEY(connector), serializedConfig);
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+ sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write connector configuration to Kafka: ", e);
throw new ConnectException("Error writing connector configuration to Kafka", e);
@@ -490,9 +522,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
public void removeConnectorConfig(String connector) {
log.debug("Removing connector configuration for connector '{}'", connector);
try {
- sendPrivileged(CONNECTOR_KEY(connector), null);
- sendPrivileged(TARGET_STATE_KEY(connector), null);
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+ List keyValues = Arrays.asList(
+ new ProducerKeyValue(CONNECTOR_KEY(connector), null),
+ new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
+ );
+ sendPrivileged(keyValues, timer);
+
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to remove connector configuration from Kafka: ", e);
throw new ConnectException("Error removing connector configuration from Kafka", e);
@@ -521,10 +558,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
*/
@Override
public void putTaskConfigs(String connector, List