KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Yash Mayya 2023-02-02 21:33:38 +05:30 committed by GitHub
parent 50e0e3c257
commit a3cf8b54e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 353 additions and 57 deletions

View File

@ -26,7 +26,7 @@ package org.apache.kafka.common.utils;
* This class also ensures monotonic updates to the timer even if the underlying clock is subject
* to non-monotonic behavior. For example, the remaining time returned by {@link #remainingMs()} is
* guaranteed to decrease monotonically until it hits zero.
*
* <p>
* Note that it is up to the caller to ensure progress of the timer using one of the
* {@link #update()} methods or {@link #sleep(long)}. The timer will cache the current time and
* return it indefinitely until the timer has been updated. This allows the caller to limit
@ -34,14 +34,14 @@ package org.apache.kafka.common.utils;
* waiting a request sent through the {@link org.apache.kafka.clients.NetworkClient} should call
* {@link #update()} following each blocking call to
* {@link org.apache.kafka.clients.NetworkClient#poll(long, long)}.
*
* <p>
* A typical usage might look something like this:
*
* <pre>
* Time time = Time.SYSTEM;
* Timer timer = time.timer(500);
*
* while (!conditionSatisfied() && timer.notExpired) {
* while (!conditionSatisfied() && timer.notExpired()) {
* client.poll(timer.remainingMs(), timer.currentTimeMs());
* timer.update();
* }
@ -137,7 +137,7 @@ public class Timer {
/**
* Update the cached current time to a specific value. In some contexts, the caller may already
* have an accurate time, so this avoids unnecessary calls to system time.
*
* <p>
* Note that if the updated current time is smaller than the cached time, then the update
* is ignored.
*
@ -161,7 +161,7 @@ public class Timer {
/**
* Get the current time in milliseconds. This will return the same cached value until the timer
* has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used.
*
* <p>
* Note that the value returned is guaranteed to increase monotonically even if the underlying
* {@link Time} implementation goes backwards. Effectively, the timer will just wait for the
* time to catch up.

View File

@ -1065,7 +1065,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.
ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
// validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG
connectorType(config));
callback.onCompletion(null, new Created<>(!exists, info));
return null;

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
@ -33,6 +34,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -40,11 +42,13 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
@ -70,6 +74,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@ -79,26 +84,45 @@ import java.util.function.Supplier;
* Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
* </p>
* <p>
* This class manages both connector and task configurations. It tracks three types of configuration entries:
* This class manages both connector and task configurations, among other various configurations. It tracks seven types
* of records:
* <p/>
* 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
* <ol>
* <li> Connector config: map of string -> string configurations passed to the Connector class, with support for
* expanding this format if necessary. (Kafka key: connector-[connector-id]).
* These configs are *not* ephemeral. They represent the source of truth. If the entire Connect
* cluster goes down, this is all that is really needed to recover.
* 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
* cluster goes down, this is all that is really needed to recover. </li>
* <li> Task configs: map of string -> string configurations passed to the Task class, with support for expanding
* this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
* These configs are ephemeral; they are stored here to a) disseminate them to all workers while
* ensuring agreement and b) to allow faster cluster/worker recovery since the common case
* of recovery (restoring a connector) will simply result in the same configuration as before
* the failure.
* 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
* These configs are ephemeral; they are stored here to
* <ul>
* <li> disseminate them to all workers while ensuring agreement </li>
* <li> to allow faster cluster/worker recovery since the common case of recovery (restoring a connector) will simply
* result in the same task configuration as before the failure. </li>
* </ul>
* </li>
* <li> Task commit "configs": records indicating that previous task config entries should be committed and all task
* configs for a connector can be applied. (Kafka key: commit-[connector-id].
* This config has two effects. First, it records the number of tasks the connector is currently
* running (and can therefore increase/decrease parallelism). Second, because each task config
* is stored separately but they need to be applied together to ensure each partition is assigned
* to a single task, this record also indicates that task configs for the specified connector
* can be "applied" or "committed".
* </p>
* can be "applied" or "committed". </li>
* <li> Connector target states: records indicating the {@link TargetState} for a connector </li>
* <li> {@link RestartRequest Restart requests}: records representing requests to restart a connector and / or its
* tasks. See <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623">KIP-745</a> for more
* details.</li>
* <li> Task count records: an integer value that that tracks the number of task producers (for source connectors) that
* will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task
* configurations. This is required for exactly-once support for source connectors, see
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>
* for more details.</li>
* <li> Session key records: A {@link SessionKey} generated by the leader of the cluster when
* {@link ConnectProtocolCompatibility#SESSIONED} is the Connect protocol being used by the cluster. This session key
* is used to verify internal REST requests between workers in the cluster. See
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints">KIP-507</a>
* for more details.</li>
* </ol>
* <p>
* This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
* ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
@ -148,7 +172,7 @@ import java.util.function.Supplier;
* Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
* (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
* configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
* This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
* This allows users of this class (i.e., {@link Herder} implementations) to take action to resolve any inconsistencies. These
* inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
* of updating task configurations).
* </p>
@ -241,7 +265,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
.field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
.build();
private static final long READ_TO_END_TIMEOUT_MS = 30000;
// Visible for testing
static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
private final Object lock;
private final Converter converter;
@ -289,6 +314,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private final boolean usesFencableWriter;
private volatile Producer<String, byte[]> fencableProducer;
private final Map<String, Object> fencableProducerProps;
private final Time time;
@Deprecated
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
@ -296,6 +322,10 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase) {
this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
}
KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase, Time time) {
this.lock = new Object();
this.started = false;
this.converter = converter;
@ -320,6 +350,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
configLog = setupAndCreateKafkaBasedLog(this.topic, config);
this.configTransformer = configTransformer;
this.time = time;
}
@Override
@ -469,8 +500,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
connectConfig.put("properties", properties);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
try {
sendPrivileged(CONNECTOR_KEY(connector), serializedConfig);
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write connector configuration to Kafka: ", e);
throw new ConnectException("Error writing connector configuration to Kafka", e);
@ -490,9 +522,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
public void removeConnectorConfig(String connector) {
log.debug("Removing connector configuration for connector '{}'", connector);
try {
sendPrivileged(CONNECTOR_KEY(connector), null);
sendPrivileged(TARGET_STATE_KEY(connector), null);
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
List<ProducerKeyValue> keyValues = Arrays.asList(
new ProducerKeyValue(CONNECTOR_KEY(connector), null),
new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
);
sendPrivileged(keyValues, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to remove connector configuration from Kafka: ", e);
throw new ConnectException("Error removing connector configuration from Kafka", e);
@ -521,10 +558,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
*/
@Override
public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
// Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
// any outstanding lagging data to consume.
try {
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
timer.update();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
@ -532,34 +571,44 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
int taskCount = configs.size();
// Start sending all the individual updates
// Send all the individual updates
int index = 0;
List<ProducerKeyValue> keyValues = new ArrayList<>();
for (Map<String, String> taskConfig: configs) {
Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
connectConfig.put("properties", taskConfig);
byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
log.debug("Writing configuration for connector '{}' task {}", connector, index);
ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
sendPrivileged(TASK_KEY(connectorTaskId), serializedConfig);
keyValues.add(new ProducerKeyValue(TASK_KEY(connectorTaskId), serializedConfig));
index++;
}
try {
sendPrivileged(keyValues, timer);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Failed to write task configurations to Kafka", e);
throw new ConnectException("Error writing task configurations to Kafka", e);
}
// Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
// the end of the log
try {
// Read to end to ensure all the task configs have been written
if (taskCount > 0) {
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
timer.update();
}
// Write the commit message
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
connectConfig.put("tasks", taskCount);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
log.debug("Writing commit for connector '{}' with {} tasks.", connector, taskCount);
sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig);
sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig, timer);
// Read to end to ensure all the commit messages have been written
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
@ -587,7 +636,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
connectTargetState.put("state", state.name());
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
log.debug("Writing target state {} for connector {}", state, connector);
configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
try {
configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write target state to Kafka", e);
throw new ConnectException("Error writing target state to Kafka", e);
}
}
/**
@ -608,8 +662,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
byte[] serializedTaskCountRecord = converter.fromConnectData(topic, TASK_COUNT_RECORD_V0, taskCountRecord);
log.debug("Writing task count record {} for connector {}", taskCount, connector);
try {
sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord);
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write task count record with {} tasks for connector {} to Kafka: ", taskCount, connector, e);
throw new ConnectException("Error writing task count record to Kafka", e);
@ -635,8 +690,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
sessionKeyStruct.put("creation-timestamp", sessionKey.creationTimestamp());
byte[] serializedSessionKey = converter.fromConnectData(topic, SESSION_KEY_V0, sessionKeyStruct);
try {
sendPrivileged(SESSION_KEY_KEY, serializedSessionKey);
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(SESSION_KEY_KEY, serializedSessionKey, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write session key to Kafka: ", e);
throw new ConnectException("Error writing session key to Kafka", e);
@ -658,8 +714,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
value.put(ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
byte[] serializedValue = converter.fromConnectData(topic, value.schema(), value);
try {
sendPrivileged(key, serializedValue);
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(key, serializedValue, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write {} to Kafka: ", restartRequest, e);
throw new ConnectException("Error writing " + restartRequest + " to Kafka", e);
@ -711,9 +768,36 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
}
private void sendPrivileged(String key, byte[] value) {
/**
* Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
* successfully invoked before calling this method if this store is configured to use a fencable writer.
* @param key the record key
* @param value the record value
* @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
*/
private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer);
}
/**
* Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
* successfully invoked before calling this method if this store is configured to use a fencable writer.
* @param keyValues the list of producer record key/value pairs
* @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
*/
private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
if (!usesFencableWriter) {
configLog.send(key, value);
List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
keyValues.forEach(
keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
);
timer.update();
for (Future<RecordMetadata> future : producerFutures) {
future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
timer.update();
}
return;
}
@ -723,8 +807,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
try {
fencableProducer.beginTransaction();
fencableProducer.send(new ProducerRecord<>(topic, key, value));
keyValues.forEach(
keyValue -> fencableProducer.send(new ProducerRecord<>(topic, keyValue.key, keyValue.value))
);
fencableProducer.commitTransaction();
timer.update();
} catch (Exception e) {
log.warn("Failed to perform fencable send to config topic", e);
relinquishWritePrivileges();
@ -732,6 +819,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
}
private static class ProducerKeyValue {
final String key;
final byte[] value;
ProducerKeyValue(String key, byte[] value) {
this.key = key;
this.value = value;
}
}
private void relinquishWritePrivileges() {
if (fencableProducer != null) {
Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic");

View File

@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
@ -342,12 +343,29 @@ public class KafkaBasedLog<K, V> {
return future;
}
public void send(K key, V value) {
send(key, value, null);
/**
* Send a record asynchronously to the configured {@link #topic} without using a producer callback.
* @param key the key for the {@link ProducerRecord}
* @param value the value for the {@link ProducerRecord}
*
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
* future if synchronous behavior is desired.
*/
public Future<RecordMetadata> send(K key, V value) {
return send(key, value, null);
}
public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
producer.orElseThrow(() ->
/**
* Send a record asynchronously to the configured {@link #topic}
* @param key the key for the {@link ProducerRecord}
* @param value the value for the {@link ProducerRecord}
* @param callback the callback to invoke after completion; can be null if no callback is desired
*
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
* future if synchronous behavior is desired.
*/
public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
return producer.orElseThrow(() ->
new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
).send(new ProducerRecord<>(topic, key, value), callback);
}

View File

@ -262,6 +262,32 @@ public class AbstractHerderTest {
assertEquals(ConnectorType.SOURCE, info.type());
}
@Test
public void testPauseConnector() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(configStore.contains(CONN1)).thenReturn(true);
herder.pauseConnector(CONN1);
verify(configStore).putTargetState(CONN1, TargetState.PAUSED);
}
@Test
public void testResumeConnector() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(configStore.contains(CONN1)).thenReturn(true);
herder.resumeConnector(CONN1);
verify(configStore).putTargetState(CONN1, TargetState.STARTED);
}
@Test
public void testConnectorInfoMissingPlugin() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()

View File

@ -735,6 +735,58 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorConfigBackingStoreError() {
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
expectConfigRefreshAndSnapshot(SNAPSHOT);
member.wakeup();
PowerMock.expectLastCall();
// mock the actual validation since its asynchronous nature is difficult to test and should
// be covered sufficiently by the unit tests for the AbstractHerder class
Capture<Callback<ConfigInfos>> validateCallback = newCapture();
herder.validateConnectorConfig(EasyMock.eq(CONN2_CONFIG), capture(validateCallback));
PowerMock.expectLastCall().andAnswer(() -> {
validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
return null;
});
configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
PowerMock.expectLastCall().andThrow(new ConnectException("Error writing connector configuration to Kafka"));
// verify that the exception from config backing store write is propagated via the callback
putConnectorCallback.onCompletion(EasyMock.anyObject(ConnectException.class), EasyMock.isNull());
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
// These will occur just before/during the second tick
member.wakeup();
PowerMock.expectLastCall();
member.ensureActive();
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll();
herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
// First tick runs the initial herder request, which issues an asynchronous request for
// connector validation
herder.tick();
// Once that validation is complete, another request is added to the herder request queue
// for actually performing the config write; this tick is for that request
herder.tick();
time.sleep(1000L);
assertStatistics(3, 1, 100, 1000L);
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorFailedValidation() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("leader");
@ -2592,6 +2644,7 @@ public class DistributedHerderTest {
});
member.wakeup();
PowerMock.expectLastCall();
configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
PowerMock.expectLastCall().andAnswer(() -> {
// Simulate response to writing config + waiting until end of log to be read

View File

@ -22,15 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
@ -62,6 +66,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -72,6 +78,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.junit.Assert.assertEquals;
@ -170,14 +177,17 @@ public class KafkaConfigBackingStoreTest {
KafkaBasedLog<String, byte[]> storeLog;
@Mock
Producer<String, byte[]> fencableProducer;
@Mock
Future<RecordMetadata> producerFuture;
private KafkaConfigBackingStore configStorage;
private Capture<String> capturedTopic = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
private final Capture<String> capturedTopic = EasyMock.newCapture();
private final Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private final Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
private final Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
private final Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
private final Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
private final MockTime time = new MockTime();
private long logOffset = 0;
@ -193,7 +203,7 @@ public class KafkaConfigBackingStoreTest {
configStorage = PowerMock.createPartialMock(
KafkaConfigBackingStore.class,
new String[]{"createKafkaBasedLog", "createFencableProducer"},
converter, config, null, null, CLIENT_ID_BASE);
converter, config, null, null, CLIENT_ID_BASE, time);
Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
// The mock must be reset and re-mocked for the remainder of the test.
@ -326,6 +336,92 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testPutConnectorConfigProducerError() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"))));
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Verify initial state
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
assertEquals(0, configState.connectors().size());
// verify that the producer exception from KafkaBasedLog::send is propagated
ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka"));
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testRemoveConnectorConfigSlowProducer() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
@SuppressWarnings("unchecked")
Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
// tombstone for the connector config
storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
@SuppressWarnings("unchecked")
Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
// tombstone for the connector target state
storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
return null;
});
// the future get timeout is expected to be reduced according to how long the previous Future::get took
targetStateProducerFuture.get(EasyMock.eq(1000L), EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
time.sleep(1000);
return null;
});
@SuppressWarnings("unchecked")
Future<Void> future = PowerMock.createMock(Future.class);
EasyMock.expect(storeLog.readToEnd()).andAnswer(() -> future);
// the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the
// timeout on the log read future to be 0
EasyMock.expect(future.get(EasyMock.eq(0L), EasyMock.anyObject())).andReturn(null);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
configStorage.removeConnectorConfig("test-connector");
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testWritePrivileges() throws Exception {
// With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer
@ -365,7 +461,9 @@ public class KafkaConfigBackingStoreTest {
// In the meantime, write a target state (which doesn't require write privileges)
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
PowerMock.expectLastCall();
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(null);
// Reclaim write privileges
expectFencableProducer();
@ -1497,13 +1595,18 @@ public class KafkaConfigBackingStoreTest {
// from the log. Validate the data that is captured when the conversion is performed matches the specified data
// (by checking a single field's value)
private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
final String dataFieldName, final Object dataFieldValue) {
final String dataFieldName, final Object dataFieldValue) throws Exception {
final Capture<Struct> capturedRecord = EasyMock.newCapture();
if (serialized != null)
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
.andReturn(serialized);
storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
PowerMock.expectLastCall();
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(null);
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
.andAnswer(() -> {
if (dataFieldName != null)
@ -1536,7 +1639,7 @@ public class KafkaConfigBackingStoreTest {
});
}
private void expectConnectorRemoval(String configKey, String targetStateKey) {
private void expectConnectorRemoval(String configKey, String targetStateKey) throws Exception {
expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
@ -1547,7 +1650,7 @@ public class KafkaConfigBackingStoreTest {
}
private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
final String dataFieldName, final Object dataFieldValue) {
final String dataFieldName, final Object dataFieldValue) throws Exception {
expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
recordsToRead.put(configKey, serialized);

View File

@ -261,7 +261,7 @@ public class EmbeddedConnectCluster {
putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" + connectClusterName);
putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-storage-topic-" + connectClusterName);
putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-status-topic-" + connectClusterName);
putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");