From a97fb662fdb84818a273271243f260ab5c447838 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 8 Jan 2025 17:55:48 +0800 Subject: [PATCH] MINOR: add testRestoreCompactedDeletedConnector back to KafkaConfigBackingStoreTest (#18392) Reviewers: Chia-Ping Tsai --- .../storage/KafkaConfigBackingStoreTest.java | 54 ++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 4173d9a357c..d455976423d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -69,6 +69,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -174,7 +175,8 @@ public class KafkaConfigBackingStoreTest { .put("state.v2", "STOPPED"); private static final List CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList( new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6), - new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9) + new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9), + new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 2) ); // The exact format doesn't matter here since both conversions are mocked @@ -818,6 +820,56 @@ public class KafkaConfigBackingStoreTest { verify(configLog).stop(); } + @Test + public void testRestoreCompactedDeletedConnector() { + // When a connector is deleted, we emit a tombstone record for its config (with key + // "connector-") and its target state (with key "target-state-"), but not + // for its task configs + // As a result, we need to carefully handle the case where task configs are present in + // the config topic for a connector, but there is no accompanying config for the + // connector itself + + int offset = 0; + List> existingRecords = List.of( + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0, + CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(2)); + logOffset = offset; + expectStart(existingRecords, deserialized); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Should see no connectors and no task configs + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(Set.of(), configState.connectors()); + assertEquals(0, configState.taskCount(CONNECTOR_1_NAME)); + assertNull(configState.rawTaskConfig(TASK_IDS.get(0))); + assertNull(configState.rawTaskConfig(TASK_IDS.get(1))); + + // Probe internal collections just to be sure + assertEquals(Map.of(), configState.connectorConfigs); + assertEquals(Map.of(), configState.taskConfigs); + assertEquals(Map.of(), configState.connectorTaskCounts); + + // Exception: we still include task count records, for the unlikely-but-possible case + // where there are still zombie instances of the tasks for this long-deleted connector + // running somewhere on the cluster + assertEquals(2, (int) configState.taskCountRecord(CONNECTOR_1_NAME)); + } + @Test public void testRecordToRestartRequest() { ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),