mirror of https://github.com/apache/kafka.git
MINOR: add testRestoreCompactedDeletedConnector back to KafkaConfigBackingStoreTest (#18392)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
058f0a94c8
commit
a97fb662fd
|
@ -69,6 +69,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -174,7 +175,8 @@ public class KafkaConfigBackingStoreTest {
|
|||
.put("state.v2", "STOPPED");
|
||||
private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(
|
||||
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
|
||||
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
|
||||
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9),
|
||||
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 2)
|
||||
);
|
||||
|
||||
// The exact format doesn't matter here since both conversions are mocked
|
||||
|
@ -818,6 +820,56 @@ public class KafkaConfigBackingStoreTest {
|
|||
verify(configLog).stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreCompactedDeletedConnector() {
|
||||
// When a connector is deleted, we emit a tombstone record for its config (with key
|
||||
// "connector-<name>") and its target state (with key "target-state-<name>"), but not
|
||||
// for its task configs
|
||||
// As a result, we need to carefully handle the case where task configs are present in
|
||||
// the config topic for a connector, but there is no accompanying config for the
|
||||
// connector itself
|
||||
|
||||
int offset = 0;
|
||||
List<ConsumerRecord<String, byte[]>> existingRecords = List.of(
|
||||
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
|
||||
TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
|
||||
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
|
||||
TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
|
||||
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
|
||||
COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
|
||||
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
|
||||
CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
|
||||
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
|
||||
deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0));
|
||||
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
|
||||
deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
|
||||
deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(2));
|
||||
logOffset = offset;
|
||||
expectStart(existingRecords, deserialized);
|
||||
when(configLog.partitionCount()).thenReturn(1);
|
||||
|
||||
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
|
||||
verifyConfigure();
|
||||
configStorage.start();
|
||||
|
||||
// Should see no connectors and no task configs
|
||||
ClusterConfigState configState = configStorage.snapshot();
|
||||
assertEquals(Set.of(), configState.connectors());
|
||||
assertEquals(0, configState.taskCount(CONNECTOR_1_NAME));
|
||||
assertNull(configState.rawTaskConfig(TASK_IDS.get(0)));
|
||||
assertNull(configState.rawTaskConfig(TASK_IDS.get(1)));
|
||||
|
||||
// Probe internal collections just to be sure
|
||||
assertEquals(Map.of(), configState.connectorConfigs);
|
||||
assertEquals(Map.of(), configState.taskConfigs);
|
||||
assertEquals(Map.of(), configState.connectorTaskCounts);
|
||||
|
||||
// Exception: we still include task count records, for the unlikely-but-possible case
|
||||
// where there are still zombie instances of the tasks for this long-deleted connector
|
||||
// running somewhere on the cluster
|
||||
assertEquals(2, (int) configState.taskCountRecord(CONNECTOR_1_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordToRestartRequest() {
|
||||
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
|
|
Loading…
Reference in New Issue