diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java index 6ebac341032..3ec037734f1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java @@ -61,6 +61,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -159,7 +160,7 @@ public class KafkaConfigBackingStoreMockitoTest { new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)) ); - + private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED"); private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1) .put("state", "PAUSED") .put("state.v2", "PAUSED"); @@ -1184,6 +1185,147 @@ public class KafkaConfigBackingStoreMockitoTest { verify(configLog).stop(); } + @Test + public void testPutTaskConfigsZeroTasks() throws Exception { + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + verify(configLog).start(); + + // Records to be read by consumer as it reads to the end of the log + doAnswer(expectReadToEnd(new LinkedHashMap<>())). + doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))) + .when(configLog).readToEnd(); + + expectConvertWriteRead( + COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), + "tasks", 0); // We have 0 tasks + + // Bootstrap as if we had already added the connector, but no tasks had been added yet + addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList()); + + + // Null before writing + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + + // Writing task configs should block until all the writes have been performed and the root record update + // has completed + List> taskConfigs = Collections.emptyList(); + configStorage.putTaskConfigs("connector1", taskConfigs); + + // Validate root config by listing all connectors and tasks + configState = configStorage.snapshot(); + assertEquals(1, configState.offset()); + String connectorName = CONNECTOR_IDS.get(0); + assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors())); + assertEquals(Collections.emptyList(), configState.tasks(connectorName)); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks + verify(configUpdateListener).onTaskConfigUpdate(Collections.emptyList()); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testBackgroundUpdateTargetState() throws Exception { + // verify that we handle target state changes correctly when they come up through the log + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserializedOnStartup = new LinkedHashMap<>(); + deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserializedOnStartup); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + verify(configLog).start(); + + // Should see a single connector with initial state started + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configStorage.connectorTargetStates.keySet()); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + + LinkedHashMap serializedAfterStartup = new LinkedHashMap<>(); + serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1)); + doAnswer(expectReadToEnd(serializedAfterStartup)).when(configLog).readToEnd(); + + Map deserializedAfterStartup = new HashMap<>(); + deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED); + deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED); + expectRead(serializedAfterStartup, deserializedAfterStartup); + + // Should see two connectors now, one paused and one stopped + configStorage.refresh(0, TimeUnit.SECONDS); + verify(configUpdateListener).onConnectorTargetStateChange(CONNECTOR_IDS.get(0)); + configState = configStorage.snapshot(); + + assertEquals(new HashSet<>(CONNECTOR_IDS), configStorage.connectorTargetStates.keySet()); + assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0))); + assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1))); + + configStorage.stop(); + verify(configStorage).stop(); + } + + @Test + public void testSameTargetState() throws Exception { + // verify that we handle target state changes correctly when they come up through the log + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + verify(configLog).start(); + + ClusterConfigState configState = configStorage.snapshot(); + expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED); + // Should see a single connector with initial state paused + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + + expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED); + // on resume update listener shouldn't be called + verify(configUpdateListener, never()).onConnectorTargetStateChange(anyString()); + + configStorage.stop(); + verify(configStorage).stop(); + } + + @Test public void testPutLogLevel() throws Exception { final String logger1 = "org.apache.zookeeper"; @@ -1293,6 +1435,12 @@ public class KafkaConfigBackingStoreMockitoTest { } } + private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) { + LinkedHashMap serializedData = new LinkedHashMap<>(); + serializedData.put(key, serializedValue); + expectRead(serializedData, Collections.singletonMap(key, deserializedValue)); + } + // This map needs to maintain ordering private Answer> expectReadToEnd(final Map serializedConfigs) { return invocation -> { @@ -1315,4 +1463,11 @@ public class KafkaConfigBackingStoreMockitoTest { for (Field field : struct.schema().fields()) result.put(field.name(), struct.get(field)); return result; } + + private void addConnector(String connectorName, Map connectorConfig, List> taskConfigs) { + for (int i = 0; i < taskConfigs.size(); i++) + configStorage.taskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i)); + configStorage.connectorConfigs.put(connectorName, connectorConfig); + configStorage.connectorTaskCounts.put(connectorName, taskConfigs.size()); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index ae5f82cd3ee..2e7b388413c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; @@ -52,13 +51,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME; @@ -430,167 +427,6 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } - @Test - public void testPutTaskConfigsZeroTasks() throws Exception { - expectConfigure(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - - // Task configs should read to end, write to the log, read to end, write root. - expectReadToEnd(new LinkedHashMap<>()); - expectConvertWriteRead( - COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), - "tasks", 0); // We have 0 tasks - // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks - configUpdateListener.onTaskConfigUpdate(Collections.emptyList()); - EasyMock.expectLastCall(); - - // Records to be read by consumer as it reads to the end of the log - LinkedHashMap serializedConfigs = new LinkedHashMap<>(); - serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - expectReadToEnd(serializedConfigs); - - expectPartitionCount(1); - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Bootstrap as if we had already added the connector, but no tasks had been added yet - whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList()); - - // Null before writing - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(-1, configState.offset()); - - // Writing task configs should block until all the writes have been performed and the root record update - // has completed - List> taskConfigs = Collections.emptyList(); - configStorage.putTaskConfigs("connector1", taskConfigs); - - // Validate root config by listing all connectors and tasks - configState = configStorage.snapshot(); - assertEquals(1, configState.offset()); - String connectorName = CONNECTOR_IDS.get(0); - assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors())); - assertEquals(Collections.emptyList(), configState.tasks(connectorName)); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testBackgroundUpdateTargetState() throws Exception { - // verify that we handle target state changes correctly when they come up through the log - - expectConfigure(); - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserializedOnStartup = new LinkedHashMap<>(); - deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - logOffset = 5; - - expectStart(existingRecords, deserializedOnStartup); - - LinkedHashMap serializedAfterStartup = new LinkedHashMap<>(); - serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1)); - - Map deserializedAfterStartup = new HashMap<>(); - deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED); - deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED); - - expectRead(serializedAfterStartup, deserializedAfterStartup); - - configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0)); - EasyMock.expectLastCall(); - - expectPartitionCount(1); - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should see a single connector with initial state started - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configStorage.connectorTargetStates.keySet()); - assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); - - // Should see two connectors now, one paused and one stopped - configStorage.refresh(0, TimeUnit.SECONDS); - configState = configStorage.snapshot(); - assertEquals(new HashSet<>(CONNECTOR_IDS), configStorage.connectorTargetStates.keySet()); - assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0))); - assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1))); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testSameTargetState() throws Exception { - // verify that we handle target state changes correctly when they come up through the log - - expectConfigure(); - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - logOffset = 5; - - expectStart(existingRecords, deserialized); - - // on resume update listener shouldn't be called - configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString()); - EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected call to onConnectorTargetStateChange")); - - expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED); - - expectPartitionCount(1); - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should see a single connector with initial state paused - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); - - configStorage.refresh(0, TimeUnit.SECONDS); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), @@ -636,12 +472,6 @@ public class KafkaConfigBackingStoreTest { } } - private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) { - LinkedHashMap serializedData = new LinkedHashMap<>(); - serializedData.put(key, serializedValue); - expectRead(serializedData, Collections.singletonMap(key, deserializedValue)); - } - // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back // from the log. Validate the data that is captured when the conversion is performed matches the specified data // (by checking a single field's value)