From 0409003c4395580b3d0b9e861e27ce61295fcdcf Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Tue, 4 Jun 2024 15:36:24 +0200 Subject: [PATCH] KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122) Reviewers: Mickael Maison --- .../kafka/connect/runtime/AbstractHerder.java | 10 +- .../distributed/DistributedHerder.java | 4 +- .../runtime/standalone/StandaloneHerder.java | 4 +- .../storage/KafkaConfigBackingStore.java | 29 ++- .../ConnectWorkerIntegrationTest.java | 199 ++++++++++++++++++ .../connect/runtime/AbstractHerderTest.java | 27 +++ .../KafkaConfigBackingStoreMockitoTest.java | 53 ++++- .../util/clusters/EmbeddedKafkaCluster.java | 16 +- 8 files changed, 325 insertions(+), 17 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 2a27103079a..c6aeea80a26 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -124,7 +124,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_C */ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener { - private final Logger log = LoggerFactory.getLogger(AbstractHerder.class); + private static final Logger log = LoggerFactory.getLogger(AbstractHerder.class); private final String workerId; protected final Worker worker; @@ -1039,16 +1039,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return result; } - public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> taskProps) { + public static boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> rawTaskProps) { int currentNumTasks = configState.taskCount(connName); boolean result = false; - if (taskProps.size() != currentNumTasks) { - log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); + if (rawTaskProps.size() != currentNumTasks) { + log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size()); result = true; } else { for (int index = 0; index < currentNumTasks; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); - if (!taskProps.get(index).equals(configState.taskConfig(taskId))) { + if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) { log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index); result = true; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index cdffbb87871..ab46ee536ac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -2229,11 +2229,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } private void publishConnectorTaskConfigs(String connName, List> taskProps, Callback cb) { - if (!taskConfigsChanged(configState, connName, taskProps)) { + List> rawTaskProps = reverseTransform(connName, configState, taskProps); + if (!taskConfigsChanged(configState, connName, rawTaskProps)) { return; } - List> rawTaskProps = reverseTransform(connName, configState, taskProps); if (isLeader()) { writeTaskConfigs(connName, rawTaskProps); cb.onCompletion(null, null); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index e773eeefd5c..2768d910d4b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -519,10 +519,10 @@ public class StandaloneHerder extends AbstractHerder { } List> newTaskConfigs = recomputeTaskConfigs(connName); + List> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs); - if (taskConfigsChanged(configState, connName, newTaskConfigs)) { + if (taskConfigsChanged(configState, connName, rawTaskConfigs)) { removeConnectorTasks(connName); - List> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs); configBackingStore.putTaskConfigs(connName, rawTaskConfigs); createConnectorTasks(connName); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index c24dd6c7907..7981f4425dc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -997,11 +997,8 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme synchronized (lock) { if (value.value() == null) { // Connector deletion will be written as a null value + processConnectorRemoval(connectorName); log.info("Successfully processed removal of connector '{}'", connectorName); - connectorConfigs.remove(connectorName); - connectorTaskCounts.remove(connectorName); - taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName)); - deferredTaskUpdates.remove(connectorName); removed = true; } else { // Connector configs can be applied and callbacks invoked immediately @@ -1064,6 +1061,21 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme private void processTasksCommitRecord(String connectorName, SchemaAndValue value) { List updatedTasks = new ArrayList<>(); synchronized (lock) { + // Edge case: connector was deleted before these task configs were published, + // but compaction took place and both the original connector config and the + // tombstone message for it have been removed from the config topic + // We should ignore these task configs + if (!connectorConfigs.containsKey(connectorName)) { + processConnectorRemoval(connectorName); + log.debug( + "Ignoring task configs for connector {}; it appears that the connector was deleted previously " + + "and that log compaction has since removed any trace of its previous configurations " + + "from the config topic", + connectorName + ); + return; + } + // Apply any outstanding deferred task updates for the given connector. Note that just because we // encounter a commit message does not mean it will result in consistent output. In particular due to // compaction, there may be cases where . For example if we have the following sequence of writes: @@ -1168,7 +1180,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme log.debug("Setting task count record for connector '{}' to {}", connectorName, taskCount); connectorTaskCountRecords.put(connectorName, taskCount); - // If a task count record appears after the latest task configs, the connectors doesn't need a round of zombie + // If a task count record appears after the latest task configs, the connector doesn't need a round of zombie // fencing before it can start tasks with the latest configs connectorsPendingFencing.remove(connectorName); } @@ -1244,6 +1256,13 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme } } + private void processConnectorRemoval(String connectorName) { + connectorConfigs.remove(connectorName); + connectorTaskCounts.remove(connectorName); + taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName)); + deferredTaskUpdates.remove(connectorName); + } + private ConnectorTaskId parseTaskId(String key) { String[] parts = key.split("-"); if (parts.length < 3) return null; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 8a730c198d3..24cbd1c280e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.integration; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.provider.FileConfigProvider; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -39,11 +40,14 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; +import java.io.File; +import java.io.FileOutputStream; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -56,6 +60,9 @@ import java.util.concurrent.atomic.AtomicReference; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG; import static org.apache.kafka.connect.integration.BlockingConnectorTest.TASK_STOP; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; @@ -71,6 +78,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_PREFIX; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG; @@ -108,6 +116,9 @@ public class ConnectWorkerIntegrationTest { @Rule public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log); + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + @Before public void setup() { // setup Connect worker properties @@ -1123,6 +1134,194 @@ public class ConnectWorkerIntegrationTest { ); } + /** + * Task configs are not removed from the config topic after a connector is deleted. + * When topic compaction takes place, this can cause the tombstone message for the + * connector config to be deleted, leaving the task configs in the config topic with no + * explicit record of the connector's deletion. + *

+ * This test guarantees that those older task configs are never used, even when the + * connector is recreated later. + */ + @Test + public void testCompactedDeletedOlderConnectorConfig() throws Exception { + brokerProps.put("log.cleaner.backoff.ms", "100"); + brokerProps.put("log.cleaner.delete.retention.ms", "1"); + brokerProps.put("log.cleaner.max.compaction.lag.ms", "1"); + brokerProps.put("log.cleaner.min.cleanable.ratio", "0"); + brokerProps.put("log.cleaner.min.compaction.lag.ms", "1"); + brokerProps.put("log.cleaner.threads", "1"); + + final String configTopic = "kafka-16838-configs"; + final int offsetCommitIntervalMs = 100; + workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); + workerProps.put(CONFIG_STORAGE_PREFIX + SEGMENT_MS_CONFIG, "100"); + workerProps.put(CONFIG_STORAGE_PREFIX + DELETE_RETENTION_MS_CONFIG, "1"); + workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs)); + + final int numWorkers = 1; + connect = connectBuilder + .numWorkers(numWorkers) + .build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp( + numWorkers, + "Initial group of workers did not start in time." + ); + + final String connectorTopic = "connector-topic"; + connect.kafka().createTopic(connectorTopic, 1); + + ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); + connectorHandle.expectedCommits(NUM_TASKS * 2); + + Map connectorConfig = defaultSourceConnectorProps(connectorTopic); + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector or its tasks did not start in time" + ); + connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + + connect.deleteConnector(CONNECTOR_NAME); + + // Roll the entire cluster + connect.activeWorkers().forEach(connect::removeWorker); + + // Miserable hack: produce directly to the config topic and then wait a little bit + // in order to trigger segment rollover and allow compaction to take place + connect.kafka().produce(configTopic, "garbage-key-1", null); + Thread.sleep(1_000); + connect.kafka().produce(configTopic, "garbage-key-2", null); + Thread.sleep(1_000); + + for (int i = 0; i < numWorkers; i++) + connect.addWorker(); + + connect.assertions().assertAtLeastNumWorkersAreUp( + numWorkers, + "Initial group of workers did not start in time." + ); + + final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0); + final long initialEndOffset = connect.kafka().endOffset(connectorTopicPartition); + assertTrue( + "Source connector should have published at least one record to Kafka", + initialEndOffset > 0 + ); + + connectorHandle.expectedCommits(NUM_TASKS * 2); + + // Re-create the connector with a different config (targets a different topic) + final String otherConnectorTopic = "other-topic"; + connect.kafka().createTopic(otherConnectorTopic, 1); + connectorConfig.put(TOPIC_CONFIG, otherConnectorTopic); + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector or its tasks did not start in time" + ); + connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + + // See if any new records got written to the old topic + final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition); + assertEquals( + "No new records should have been written to the older topic", + initialEndOffset, + nextEndOffset + ); + } + + /** + * If a connector has existing tasks, and then generates new task configs, workers compare the + * new and existing configs before publishing them to the config topic. If there is no difference, + * workers do not publish task configs (this is a workaround to prevent infinite loops with eager + * rebalancing). + *

+ * This test tries to guarantee that, if the old task configs become invalid because of + * an invalid config provider reference, it will still be possible to reconfigure the connector. + */ + @Test + public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception { + final int offsetCommitIntervalMs = 100; + workerProps.put(CONFIG_PROVIDERS_CONFIG, "file"); + workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", FileConfigProvider.class.getName()); + workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs)); + + final int numWorkers = 1; + connect = connectBuilder + .numWorkers(numWorkers) + .build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp( + numWorkers, + "Initial group of workers did not start in time." + ); + + final String firstConnectorTopic = "connector-topic-1"; + connect.kafka().createTopic(firstConnectorTopic); + + final File secretsFile = tmp.newFile("test-secrets"); + final Properties secrets = new Properties(); + final String throughputSecretKey = "secret-throughput"; + secrets.put(throughputSecretKey, "10"); + try (FileOutputStream secretsOutputStream = new FileOutputStream(secretsFile)) { + secrets.store(secretsOutputStream, null); + } + + ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); + connectorHandle.expectedCommits(NUM_TASKS * 2); + + Map connectorConfig = defaultSourceConnectorProps(firstConnectorTopic); + connectorConfig.put( + "throughput", + "${file:" + secretsFile.getAbsolutePath() + ":" + throughputSecretKey + "}" + ); + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector or its tasks did not start in time" + ); + connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + + // Delete the secrets file, which should render the old task configs invalid + assertTrue("Failed to delete secrets file", secretsFile.delete()); + + // Use a start latch here instead of assertConnectorAndExactlyNumTasksAreRunning + // since failure to reconfigure the tasks (which may occur if the bug this test was written + // to help catch resurfaces) will not cause existing tasks to fail or stop running + StartAndStopLatch restarts = connectorHandle.expectedStarts(1); + connectorHandle.expectedCommits(NUM_TASKS * 2); + + final String secondConnectorTopic = "connector-topic-2"; + connect.kafka().createTopic(secondConnectorTopic, 1); + + // Stop using the config provider for this connector, and instruct it to start writing to the + // old topic again + connectorConfig.put("throughput", "10"); + connectorConfig.put(TOPIC_CONFIG, secondConnectorTopic); + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + assertTrue( + "Connector tasks were not restarted in time", + restarts.await(10, TimeUnit.SECONDS) + ); + connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + + final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0)); + assertTrue( + "Source connector should have published at least one record to new Kafka topic " + + "after being reconfigured", + endOffset > 0 + ); + } + private Map defaultSourceConnectorProps(String topic) { // setup props for the source connector Map props = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 62283a02771..5bfbe2498ca 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -84,12 +84,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -1116,6 +1118,31 @@ public class AbstractHerderTest { assertEquals(offsets, cb.get(1000, TimeUnit.MILLISECONDS)); } + @Test + public void testTaskConfigComparison() { + ClusterConfigState snapshot = mock(ClusterConfigState.class); + + when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIGS.size()); + TASK_CONFIGS_MAP.forEach((task, config) -> when(snapshot.rawTaskConfig(task)).thenReturn(config)); + // Same task configs, same number of tasks--no change + assertFalse(AbstractHerder.taskConfigsChanged(snapshot, CONN1, TASK_CONFIGS)); + + when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIGS.size() + 1); + // Different number of tasks; should report a change + assertTrue(AbstractHerder.taskConfigsChanged(snapshot, CONN1, TASK_CONFIGS)); + + when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIG.size()); + List> alteredTaskConfigs = new ArrayList<>(TASK_CONFIGS); + alteredTaskConfigs.set(alteredTaskConfigs.size() - 1, Collections.emptyMap()); + // Last task config is different; should report a change + assertTrue(AbstractHerder.taskConfigsChanged(snapshot, CONN1, alteredTaskConfigs)); + + // Make sure we used exclusively raw task configs and never attempted transformation, + // since otherwise failures in transformation could either cause an infinite loop of task + // config generation, or prevent any task configs from being published + verify(snapshot, never()).taskConfig(any()); + } + protected void addConfigKey(Map keys, String name, String group) { ConfigDef configDef = new ConfigDef().define(name, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.HIGH, "doc", group, 10, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java index 3ec037734f1..8d8980d322c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java @@ -171,7 +171,8 @@ public class KafkaConfigBackingStoreMockitoTest { .put("state.v2", "STOPPED"); private static final List CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList( new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6), - new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9) + new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9), + new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 2) ); // The exact format doesn't matter here since both conversions are mocked @@ -814,6 +815,56 @@ public class KafkaConfigBackingStoreMockitoTest { verify(configLog).stop(); } + @Test + public void testRestoreCompactedDeletedConnector() { + // When a connector is deleted, we emit a tombstone record for its config (with key + // "connector-") and its target state (with key "target-state-"), but not + // for its task configs + // As a result, we need to carefully handle the case where task configs are present in + // the config topic for a connector, but there is no accompanying config for the + // connector itself + + int offset = 0; + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(2)); + logOffset = offset; + expectStart(existingRecords, deserialized); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Should see no connectors and no task configs + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(Collections.emptySet(), configState.connectors()); + assertEquals(0, configState.taskCount(CONNECTOR_1_NAME)); + assertNull(configState.rawTaskConfig(TASK_IDS.get(0))); + assertNull(configState.rawTaskConfig(TASK_IDS.get(1))); + + // Probe internal collections just to be sure + assertEquals(Collections.emptyMap(), configState.connectorConfigs); + assertEquals(Collections.emptyMap(), configState.taskConfigs); + assertEquals(Collections.emptyMap(), configState.connectorTaskCounts); + + // Exception: we still include task count records, for the unlikely-but-possible case + // where there are still zombie instances of the tasks for this long-deleted connector + // running somewhere on the cluster + assertEquals(2, (int) configState.taskCountRecord(CONNECTOR_1_NAME)); + } + @Test public void testRecordToRestartRequest() { ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index f959d225377..fe97476a712 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -50,7 +50,6 @@ import org.apache.kafka.common.errors.InvalidReplicationFactorException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; @@ -112,7 +111,7 @@ public class EmbeddedKafkaCluster { // Kafka Config private final KafkaServer[] brokers; private final Properties brokerConfig; - private final Time time = new MockTime(); + private final Time time = Time.SYSTEM; private final int[] currentBrokerPorts; private final String[] currentBrokerLogDirs; private final boolean hasListenerConfig; @@ -611,6 +610,19 @@ public class EmbeddedKafkaCluster { return new ConsumerRecords<>(records); } + public long endOffset(TopicPartition topicPartition) throws TimeoutException, InterruptedException, ExecutionException { + try (Admin admin = createAdminClient()) { + Map offsets = Collections.singletonMap( + topicPartition, OffsetSpec.latest() + ); + return admin.listOffsets(offsets) + .partitionResult(topicPartition) + // Hardcode duration for now; if necessary, we can add a parameter for it later + .get(10, TimeUnit.SECONDS) + .offset(); + } + } + /** * List all the known partitions for the given {@link Collection} of topics * @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds).