KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Chris Egerton 2024-06-04 15:36:24 +02:00 committed by GitHub
parent a08db65670
commit 0409003c43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 325 additions and 17 deletions

View File

@ -124,7 +124,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_C
*/ */
public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener { public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
private final Logger log = LoggerFactory.getLogger(AbstractHerder.class); private static final Logger log = LoggerFactory.getLogger(AbstractHerder.class);
private final String workerId; private final String workerId;
protected final Worker worker; protected final Worker worker;
@ -1039,16 +1039,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
return result; return result;
} }
public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List<Map<String, String>> taskProps) { public static boolean taskConfigsChanged(ClusterConfigState configState, String connName, List<Map<String, String>> rawTaskProps) {
int currentNumTasks = configState.taskCount(connName); int currentNumTasks = configState.taskCount(connName);
boolean result = false; boolean result = false;
if (taskProps.size() != currentNumTasks) { if (rawTaskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size());
result = true; result = true;
} else { } else {
for (int index = 0; index < currentNumTasks; index++) { for (int index = 0; index < currentNumTasks; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index); ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if (!taskProps.get(index).equals(configState.taskConfig(taskId))) { if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) {
log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index); log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index);
result = true; result = true;
} }

View File

@ -2229,11 +2229,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
} }
private void publishConnectorTaskConfigs(String connName, List<Map<String, String>> taskProps, Callback<Void> cb) { private void publishConnectorTaskConfigs(String connName, List<Map<String, String>> taskProps, Callback<Void> cb) {
if (!taskConfigsChanged(configState, connName, taskProps)) { List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (!taskConfigsChanged(configState, connName, rawTaskProps)) {
return; return;
} }
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) { if (isLeader()) {
writeTaskConfigs(connName, rawTaskProps); writeTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null); cb.onCompletion(null, null);

View File

@ -519,10 +519,10 @@ public class StandaloneHerder extends AbstractHerder {
} }
List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName); List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
if (taskConfigsChanged(configState, connName, newTaskConfigs)) {
removeConnectorTasks(connName);
List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs); List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
if (taskConfigsChanged(configState, connName, rawTaskConfigs)) {
removeConnectorTasks(connName);
configBackingStore.putTaskConfigs(connName, rawTaskConfigs); configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
createConnectorTasks(connName); createConnectorTasks(connName);
} }

View File

@ -997,11 +997,8 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
synchronized (lock) { synchronized (lock) {
if (value.value() == null) { if (value.value() == null) {
// Connector deletion will be written as a null value // Connector deletion will be written as a null value
processConnectorRemoval(connectorName);
log.info("Successfully processed removal of connector '{}'", connectorName); log.info("Successfully processed removal of connector '{}'", connectorName);
connectorConfigs.remove(connectorName);
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
removed = true; removed = true;
} else { } else {
// Connector configs can be applied and callbacks invoked immediately // Connector configs can be applied and callbacks invoked immediately
@ -1064,6 +1061,21 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
private void processTasksCommitRecord(String connectorName, SchemaAndValue value) { private void processTasksCommitRecord(String connectorName, SchemaAndValue value) {
List<ConnectorTaskId> updatedTasks = new ArrayList<>(); List<ConnectorTaskId> updatedTasks = new ArrayList<>();
synchronized (lock) { synchronized (lock) {
// Edge case: connector was deleted before these task configs were published,
// but compaction took place and both the original connector config and the
// tombstone message for it have been removed from the config topic
// We should ignore these task configs
if (!connectorConfigs.containsKey(connectorName)) {
processConnectorRemoval(connectorName);
log.debug(
"Ignoring task configs for connector {}; it appears that the connector was deleted previously "
+ "and that log compaction has since removed any trace of its previous configurations "
+ "from the config topic",
connectorName
);
return;
}
// Apply any outstanding deferred task updates for the given connector. Note that just because we // Apply any outstanding deferred task updates for the given connector. Note that just because we
// encounter a commit message does not mean it will result in consistent output. In particular due to // encounter a commit message does not mean it will result in consistent output. In particular due to
// compaction, there may be cases where . For example if we have the following sequence of writes: // compaction, there may be cases where . For example if we have the following sequence of writes:
@ -1168,7 +1180,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
log.debug("Setting task count record for connector '{}' to {}", connectorName, taskCount); log.debug("Setting task count record for connector '{}' to {}", connectorName, taskCount);
connectorTaskCountRecords.put(connectorName, taskCount); connectorTaskCountRecords.put(connectorName, taskCount);
// If a task count record appears after the latest task configs, the connectors doesn't need a round of zombie // If a task count record appears after the latest task configs, the connector doesn't need a round of zombie
// fencing before it can start tasks with the latest configs // fencing before it can start tasks with the latest configs
connectorsPendingFencing.remove(connectorName); connectorsPendingFencing.remove(connectorName);
} }
@ -1244,6 +1256,13 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
} }
} }
private void processConnectorRemoval(String connectorName) {
connectorConfigs.remove(connectorName);
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
}
private ConnectorTaskId parseTaskId(String key) { private ConnectorTaskId parseTaskId(String key) {
String[] parts = key.split("-"); String[] parts = key.split("-");
if (parts.length < 3) return null; if (parts.length < 3) return null;

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.integration; package org.apache.kafka.connect.integration;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
@ -39,11 +40,14 @@ import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -56,6 +60,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG;
import static org.apache.kafka.connect.integration.BlockingConnectorTest.TASK_STOP; import static org.apache.kafka.connect.integration.BlockingConnectorTest.TASK_STOP;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
@ -71,6 +78,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_PREFIX;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG;
@ -108,6 +116,9 @@ public class ConnectWorkerIntegrationTest {
@Rule @Rule
public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log); public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
@Rule
public TemporaryFolder tmp = new TemporaryFolder();
@Before @Before
public void setup() { public void setup() {
// setup Connect worker properties // setup Connect worker properties
@ -1123,6 +1134,194 @@ public class ConnectWorkerIntegrationTest {
); );
} }
/**
* Task configs are not removed from the config topic after a connector is deleted.
* When topic compaction takes place, this can cause the tombstone message for the
* connector config to be deleted, leaving the task configs in the config topic with no
* explicit record of the connector's deletion.
* <p>
* This test guarantees that those older task configs are never used, even when the
* connector is recreated later.
*/
@Test
public void testCompactedDeletedOlderConnectorConfig() throws Exception {
brokerProps.put("log.cleaner.backoff.ms", "100");
brokerProps.put("log.cleaner.delete.retention.ms", "1");
brokerProps.put("log.cleaner.max.compaction.lag.ms", "1");
brokerProps.put("log.cleaner.min.cleanable.ratio", "0");
brokerProps.put("log.cleaner.min.compaction.lag.ms", "1");
brokerProps.put("log.cleaner.threads", "1");
final String configTopic = "kafka-16838-configs";
final int offsetCommitIntervalMs = 100;
workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
workerProps.put(CONFIG_STORAGE_PREFIX + SEGMENT_MS_CONFIG, "100");
workerProps.put(CONFIG_STORAGE_PREFIX + DELETE_RETENTION_MS_CONFIG, "1");
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs));
final int numWorkers = 1;
connect = connectBuilder
.numWorkers(numWorkers)
.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);
final String connectorTopic = "connector-topic";
connect.kafka().createTopic(connectorTopic, 1);
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
connectorHandle.expectedCommits(NUM_TASKS * 2);
Map<String, String> connectorConfig = defaultSourceConnectorProps(connectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or its tasks did not start in time"
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
connect.deleteConnector(CONNECTOR_NAME);
// Roll the entire cluster
connect.activeWorkers().forEach(connect::removeWorker);
// Miserable hack: produce directly to the config topic and then wait a little bit
// in order to trigger segment rollover and allow compaction to take place
connect.kafka().produce(configTopic, "garbage-key-1", null);
Thread.sleep(1_000);
connect.kafka().produce(configTopic, "garbage-key-2", null);
Thread.sleep(1_000);
for (int i = 0; i < numWorkers; i++)
connect.addWorker();
connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);
final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0);
final long initialEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertTrue(
"Source connector should have published at least one record to Kafka",
initialEndOffset > 0
);
connectorHandle.expectedCommits(NUM_TASKS * 2);
// Re-create the connector with a different config (targets a different topic)
final String otherConnectorTopic = "other-topic";
connect.kafka().createTopic(otherConnectorTopic, 1);
connectorConfig.put(TOPIC_CONFIG, otherConnectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or its tasks did not start in time"
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
// See if any new records got written to the old topic
final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertEquals(
"No new records should have been written to the older topic",
initialEndOffset,
nextEndOffset
);
}
/**
* If a connector has existing tasks, and then generates new task configs, workers compare the
* new and existing configs before publishing them to the config topic. If there is no difference,
* workers do not publish task configs (this is a workaround to prevent infinite loops with eager
* rebalancing).
* <p>
* This test tries to guarantee that, if the old task configs become invalid because of
* an invalid config provider reference, it will still be possible to reconfigure the connector.
*/
@Test
public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
final int offsetCommitIntervalMs = 100;
workerProps.put(CONFIG_PROVIDERS_CONFIG, "file");
workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", FileConfigProvider.class.getName());
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs));
final int numWorkers = 1;
connect = connectBuilder
.numWorkers(numWorkers)
.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);
final String firstConnectorTopic = "connector-topic-1";
connect.kafka().createTopic(firstConnectorTopic);
final File secretsFile = tmp.newFile("test-secrets");
final Properties secrets = new Properties();
final String throughputSecretKey = "secret-throughput";
secrets.put(throughputSecretKey, "10");
try (FileOutputStream secretsOutputStream = new FileOutputStream(secretsFile)) {
secrets.store(secretsOutputStream, null);
}
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
connectorHandle.expectedCommits(NUM_TASKS * 2);
Map<String, String> connectorConfig = defaultSourceConnectorProps(firstConnectorTopic);
connectorConfig.put(
"throughput",
"${file:" + secretsFile.getAbsolutePath() + ":" + throughputSecretKey + "}"
);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or its tasks did not start in time"
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
// Delete the secrets file, which should render the old task configs invalid
assertTrue("Failed to delete secrets file", secretsFile.delete());
// Use a start latch here instead of assertConnectorAndExactlyNumTasksAreRunning
// since failure to reconfigure the tasks (which may occur if the bug this test was written
// to help catch resurfaces) will not cause existing tasks to fail or stop running
StartAndStopLatch restarts = connectorHandle.expectedStarts(1);
connectorHandle.expectedCommits(NUM_TASKS * 2);
final String secondConnectorTopic = "connector-topic-2";
connect.kafka().createTopic(secondConnectorTopic, 1);
// Stop using the config provider for this connector, and instruct it to start writing to the
// old topic again
connectorConfig.put("throughput", "10");
connectorConfig.put(TOPIC_CONFIG, secondConnectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
assertTrue(
"Connector tasks were not restarted in time",
restarts.await(10, TimeUnit.SECONDS)
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
assertTrue(
"Source connector should have published at least one record to new Kafka topic "
+ "after being reconfigured",
endOffset > 0
);
}
private Map<String, String> defaultSourceConnectorProps(String topic) { private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector // setup props for the source connector
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();

View File

@ -84,12 +84,14 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -1116,6 +1118,31 @@ public class AbstractHerderTest {
assertEquals(offsets, cb.get(1000, TimeUnit.MILLISECONDS)); assertEquals(offsets, cb.get(1000, TimeUnit.MILLISECONDS));
} }
@Test
public void testTaskConfigComparison() {
ClusterConfigState snapshot = mock(ClusterConfigState.class);
when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIGS.size());
TASK_CONFIGS_MAP.forEach((task, config) -> when(snapshot.rawTaskConfig(task)).thenReturn(config));
// Same task configs, same number of tasks--no change
assertFalse(AbstractHerder.taskConfigsChanged(snapshot, CONN1, TASK_CONFIGS));
when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIGS.size() + 1);
// Different number of tasks; should report a change
assertTrue(AbstractHerder.taskConfigsChanged(snapshot, CONN1, TASK_CONFIGS));
when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIG.size());
List<Map<String, String>> alteredTaskConfigs = new ArrayList<>(TASK_CONFIGS);
alteredTaskConfigs.set(alteredTaskConfigs.size() - 1, Collections.emptyMap());
// Last task config is different; should report a change
assertTrue(AbstractHerder.taskConfigsChanged(snapshot, CONN1, alteredTaskConfigs));
// Make sure we used exclusively raw task configs and never attempted transformation,
// since otherwise failures in transformation could either cause an infinite loop of task
// config generation, or prevent any task configs from being published
verify(snapshot, never()).taskConfig(any());
}
protected void addConfigKey(Map<String, ConfigDef.ConfigKey> keys, String name, String group) { protected void addConfigKey(Map<String, ConfigDef.ConfigKey> keys, String name, String group) {
ConfigDef configDef = new ConfigDef().define(name, ConfigDef.Type.STRING, null, null, ConfigDef configDef = new ConfigDef().define(name, ConfigDef.Type.STRING, null, null,
ConfigDef.Importance.HIGH, "doc", group, 10, ConfigDef.Importance.HIGH, "doc", group, 10,

View File

@ -171,7 +171,8 @@ public class KafkaConfigBackingStoreMockitoTest {
.put("state.v2", "STOPPED"); .put("state.v2", "STOPPED");
private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList( private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6), new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9) new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9),
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 2)
); );
// The exact format doesn't matter here since both conversions are mocked // The exact format doesn't matter here since both conversions are mocked
@ -814,6 +815,56 @@ public class KafkaConfigBackingStoreMockitoTest {
verify(configLog).stop(); verify(configLog).stop();
} }
@Test
public void testRestoreCompactedDeletedConnector() {
// When a connector is deleted, we emit a tombstone record for its config (with key
// "connector-<name>") and its target state (with key "target-state-<name>"), but not
// for its task configs
// As a result, we need to carefully handle the case where task configs are present in
// the config topic for a connector, but there is no accompanying config for the
// connector itself
int offset = 0;
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(2));
logOffset = offset;
expectStart(existingRecords, deserialized);
when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// Should see no connectors and no task configs
ClusterConfigState configState = configStorage.snapshot();
assertEquals(Collections.emptySet(), configState.connectors());
assertEquals(0, configState.taskCount(CONNECTOR_1_NAME));
assertNull(configState.rawTaskConfig(TASK_IDS.get(0)));
assertNull(configState.rawTaskConfig(TASK_IDS.get(1)));
// Probe internal collections just to be sure
assertEquals(Collections.emptyMap(), configState.connectorConfigs);
assertEquals(Collections.emptyMap(), configState.taskConfigs);
assertEquals(Collections.emptyMap(), configState.connectorTaskCounts);
// Exception: we still include task count records, for the unlikely-but-possible case
// where there are still zombie instances of the tasks for this long-deleted connector
// running somewhere on the cluster
assertEquals(2, (int) configState.taskCountRecord(CONNECTOR_1_NAME));
}
@Test @Test
public void testRecordToRestartRequest() { public void testRecordToRestartRequest() {
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),

View File

@ -50,7 +50,6 @@ import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
@ -112,7 +111,7 @@ public class EmbeddedKafkaCluster {
// Kafka Config // Kafka Config
private final KafkaServer[] brokers; private final KafkaServer[] brokers;
private final Properties brokerConfig; private final Properties brokerConfig;
private final Time time = new MockTime(); private final Time time = Time.SYSTEM;
private final int[] currentBrokerPorts; private final int[] currentBrokerPorts;
private final String[] currentBrokerLogDirs; private final String[] currentBrokerLogDirs;
private final boolean hasListenerConfig; private final boolean hasListenerConfig;
@ -611,6 +610,19 @@ public class EmbeddedKafkaCluster {
return new ConsumerRecords<>(records); return new ConsumerRecords<>(records);
} }
public long endOffset(TopicPartition topicPartition) throws TimeoutException, InterruptedException, ExecutionException {
try (Admin admin = createAdminClient()) {
Map<TopicPartition, OffsetSpec> offsets = Collections.singletonMap(
topicPartition, OffsetSpec.latest()
);
return admin.listOffsets(offsets)
.partitionResult(topicPartition)
// Hardcode duration for now; if necessary, we can add a parameter for it later
.get(10, TimeUnit.SECONDS)
.offset();
}
}
/** /**
* List all the known partitions for the given {@link Collection} of topics * List all the known partitions for the given {@link Collection} of topics
* @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds). * @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds).