diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java index c8bdea1fd8c..34827ba7dae 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java @@ -38,17 +38,18 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.RestartRequest; +import org.apache.kafka.connect.runtime.SessionKey; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; -import org.apache.kafka.connect.util.TestFuture; import org.apache.kafka.connect.util.TopicAdmin; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.AdditionalMatchers; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -64,6 +65,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -86,6 +88,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -235,8 +238,7 @@ public class KafkaConfigBackingStoreMockitoTest { props.put("config.storage.max.message.bytes", "1001"); createStore(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); @@ -265,8 +267,8 @@ public class KafkaConfigBackingStoreMockitoTest { props.put("config.storage.min.insync.replicas", "3"); props.put("config.storage.max.message.bytes", "1001"); createStore(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); + + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -285,8 +287,7 @@ public class KafkaConfigBackingStoreMockitoTest { @Test public void testPutConnectorConfig() throws Exception { - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -337,8 +338,10 @@ public class KafkaConfigBackingStoreMockitoTest { verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1)); // Config deletion - expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null); - expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null); + when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null); + when(converter.toConnectData(TOPIC, null)).thenReturn(new SchemaAndValue(null, null)); + when(configLog.sendWithReceipt(AdditionalMatchers.or(Mockito.eq(configKey), Mockito.eq(targetStateKey)), + Mockito.isNull())).thenReturn(producerFuture); // Deletion should remove the second one we added configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1)); @@ -356,8 +359,7 @@ public class KafkaConfigBackingStoreMockitoTest { @Test public void testPutConnectorConfigWithTargetState() throws Exception { - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -402,8 +404,7 @@ public class KafkaConfigBackingStoreMockitoTest { @Test public void testPutConnectorConfigProducerError() throws Exception { - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -433,8 +434,7 @@ public class KafkaConfigBackingStoreMockitoTest { @Test public void testRemoveConnectorConfigSlowProducer() throws Exception { - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -486,8 +486,7 @@ public class KafkaConfigBackingStoreMockitoTest { props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); createStore(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -507,7 +506,8 @@ public class KafkaConfigBackingStoreMockitoTest { doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))) .doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)))) .when(configLog).readToEnd(); - expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)); + when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0))) + .thenReturn(new SchemaAndValue(null, structToMap(CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)))); // Should succeed now configStorage.claimWritePrivileges(); @@ -583,7 +583,7 @@ public class KafkaConfigBackingStoreMockitoTest { logOffset = 5; expectStart(existingRecords, deserialized); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -592,7 +592,7 @@ public class KafkaConfigBackingStoreMockitoTest { // The target state deletion should reset the state to STARTED ClusterConfigState configState = configStorage.snapshot(); assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); configStorage.stop(); @@ -627,7 +627,7 @@ public class KafkaConfigBackingStoreMockitoTest { expectStart(existingRecords, deserialized); // Shouldn't see any callbacks since this is during startup - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -636,7 +636,7 @@ public class KafkaConfigBackingStoreMockitoTest { // Should see a single connector with initial state paused ClusterConfigState configState = configStorage.snapshot(); assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0))); assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1))); @@ -683,7 +683,7 @@ public class KafkaConfigBackingStoreMockitoTest { logOffset = 9; expectStart(existingRecords, deserialized); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -692,7 +692,7 @@ public class KafkaConfigBackingStoreMockitoTest { // Should see a single connector and its config should be the last one seen anywhere in the log ClusterConfigState configState = configStorage.snapshot(); assertEquals(logOffset, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); @@ -740,7 +740,7 @@ public class KafkaConfigBackingStoreMockitoTest { logOffset = 6; expectStart(existingRecords, deserialized); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -791,7 +791,7 @@ public class KafkaConfigBackingStoreMockitoTest { deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR); logOffset = 8; expectStart(existingRecords, deserialized); - expectPartitionCount(1); + when(configLog.partitionCount()).thenReturn(1); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); @@ -897,8 +897,7 @@ public class KafkaConfigBackingStoreMockitoTest { @Test public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() { - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(2); + when(configLog.partitionCount()).thenReturn(2); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start()); @@ -961,6 +960,291 @@ public class KafkaConfigBackingStoreMockitoTest { assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)); } + @Test + public void testBackgroundConnectorDeletion() throws Exception { + // verify that we handle connector deletions correctly when they come up through the log + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + + configStorage.start(); + verify(configLog).start(); + + // Should see a single connector with initial state paused + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 0))); + assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 1))); + assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0))); + + LinkedHashMap serializedData = new LinkedHashMap<>(); + serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + serializedData.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1)); + doAnswer(expectReadToEnd(serializedData)).when(configLog).readToEnd(); + + Map deserializedData = new HashMap<>(); + deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null); + deserializedData.put(TARGET_STATE_KEYS.get(0), null); + expectRead(serializedData, deserializedData); + + configStorage.refresh(0, TimeUnit.SECONDS); + verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(0)); + + configState = configStorage.snapshot(); + // Connector should now be removed from the snapshot + assertFalse(configState.contains(CONNECTOR_IDS.get(0))); + assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0))); + // Ensure that the deleted connector's deferred task updates have been cleaned up + // in order to prevent unbounded growth of the map + assertEquals(Collections.emptyMap(), configStorage.deferredTaskUpdates); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { + // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. + // We start out by loading an initial configuration where we started to write a task update, and then + // compaction cleaned up the earlier record. + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + // This is the record that has been compacted: + //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1)); + logOffset = 6; + expectStart(existingRecords, deserialized); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // After reading the log, it should have been in an inconsistent state + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(6, configState.offset()); // Should always be next to be read, not last committed + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list + assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); + // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] + assertNull(configState.taskConfig(TASK_IDS.get(0))); + assertNull(configState.taskConfig(TASK_IDS.get(1))); + assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors()); + + // Records to be read by consumer as it reads to the end of the log + LinkedHashMap serializedConfigs = new LinkedHashMap<>(); + serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); + // Successful attempt to write new task config + doAnswer(expectReadToEnd(new LinkedHashMap<>())) + .doAnswer(expectReadToEnd(new LinkedHashMap<>())) + .doAnswer(expectReadToEnd(serializedConfigs)) + .when(configLog).readToEnd(); + expectConvertWriteRead( + TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), + "properties", SAMPLE_CONFIGS.get(0)); + expectConvertWriteRead( + COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), + "tasks", 1); // Updated to just 1 task + + // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case + // we are going to shrink the number of tasks to 1 + configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0))); + + // Validate updated config + configState = configStorage.snapshot(); + // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written + // to the topic. Only the last call with 1 task config + 1 commit actually gets written. + assertEquals(8, configState.offset()); + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks + verify(configUpdateListener).onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(0))); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testPutRestartRequestOnlyFailed() throws Exception { + RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, false); + testPutRestartRequest(restartRequest); + } + + @Test + public void testPutRestartRequestOnlyFailedIncludingTasks() throws Exception { + RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, true); + testPutRestartRequest(restartRequest); + } + + private void testPutRestartRequest(RestartRequest restartRequest) throws Exception { + expectStart(Collections.emptyList(), Collections.emptyMap()); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + + configStorage.start(); + verify(configLog).start(); + + expectConvertWriteRead( + RESTART_CONNECTOR_KEYS.get(0), KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0), + ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed()); + + LinkedHashMap recordsToRead = new LinkedHashMap<>(); + recordsToRead.put(RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + doAnswer(expectReadToEnd(recordsToRead)).when(configLog).readToEnd(); + + // Writing should block until it is written and read back from Kafka + configStorage.putRestartRequest(restartRequest); + + final ArgumentCaptor restartRequestCaptor = ArgumentCaptor.forClass(RestartRequest.class); + verify(configUpdateListener).onRestartRequest(restartRequestCaptor.capture()); + + assertEquals(restartRequest.connectorName(), restartRequestCaptor.getValue().connectorName()); + assertEquals(restartRequest.onlyFailed(), restartRequestCaptor.getValue().onlyFailed()); + assertEquals(restartRequest.includeTasks(), restartRequestCaptor.getValue().includeTasks()); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testRestoreRestartRequestInconsistentState() { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state doesn't prevent startup. + // Overwrite each type at least once to ensure we see the latest data after loading + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), RESTART_REQUEST_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), RESTART_REQUEST_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(2), RESTART_REQUEST_STRUCTS.get(2)); + deserialized.put(CONFIGS_SERIALIZED.get(3), null); + logOffset = 4; + expectStart(existingRecords, deserialized); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + + configStorage.start(); + verify(configLog).start(); + + // Shouldn't see any callbacks since this is during startup + verify(configUpdateListener, never()).onConnectorConfigRemove(anyString()); + verify(configUpdateListener, never()).onConnectorConfigUpdate(anyString()); + verify(configUpdateListener, never()).onTaskConfigUpdate(anyCollection()); + verify(configUpdateListener, never()).onConnectorTargetStateChange(anyString()); + verify(configUpdateListener, never()).onSessionKeyUpdate(any(SessionKey.class)); + verify(configUpdateListener, never()).onRestartRequest(any(RestartRequest.class)); + verify(configUpdateListener, never()).onLoggingLevelUpdate(anyString(), anyString()); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testPutLogLevel() throws Exception { + final String logger1 = "org.apache.zookeeper"; + final String logger2 = "org.apache.cassandra"; + final String logger3 = "org.apache.kafka.clients"; + final String logger4 = "org.apache.kafka.connect"; + final String level1 = "ERROR"; + final String level3 = "WARN"; + final String level4 = "DEBUG"; + + final Struct existingLogLevel = new Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0) + .put("level", level1); + + // Pre-populate the config topic with a couple of logger level records; these should be ignored (i.e., + // not reported to the update listener) + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger1, + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty() + ), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger2, + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty() + ) + ); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), existingLogLevel); + // Make sure we gracefully handle tombstones + deserialized.put(CONFIGS_SERIALIZED.get(1), null); + logOffset = 2; + + expectStart(existingRecords, deserialized); + when(configLog.partitionCount()).thenReturn(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + + configStorage.start(); + verify(configLog).start(); + + expectConvertWriteRead( + "logger-cluster-" + logger3, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2), + "level", level3); + configStorage.putLoggerLevel(logger3, level3); + + expectConvertWriteRead( + "logger-cluster-" + logger4, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3), + "level", level4); + configStorage.putLoggerLevel(logger4, level4); + + LinkedHashMap newRecords = new LinkedHashMap<>(); + newRecords.put("logger-cluster-" + logger3, CONFIGS_SERIALIZED.get(2)); + newRecords.put("logger-cluster-" + logger4, CONFIGS_SERIALIZED.get(3)); + doAnswer(expectReadToEnd(newRecords)).when(configLog).readToEnd(); + + configStorage.refresh(0, TimeUnit.SECONDS); + verify(configUpdateListener).onLoggingLevelUpdate(logger3, level3); + verify(configUpdateListener).onLoggingLevelUpdate(logger4, level4); + + configStorage.stop(); + verify(configLog).stop(); + } + private void verifyConfigure() { verify(configStorage).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(), capturedConsumerProps.capture(), capturedConsumedCallback.capture(), @@ -984,36 +1268,22 @@ public class KafkaConfigBackingStoreMockitoTest { } } - private void expectPartitionCount(int partitionCount) { - when(configLog.partitionCount()).thenReturn(partitionCount); - } - // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back // from the log. Validate the data that is captured when the conversion is performed matches the specified data // (by checking a single field's value) private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized, final String dataFieldName, final Object dataFieldValue) throws Exception { final ArgumentCaptor capturedRecord = ArgumentCaptor.forClass(Struct.class); - if (serialized != null) - when(converter.fromConnectData(eq(TOPIC), eq(valueSchema), capturedRecord.capture())) - .thenReturn(serialized); - + when(converter.fromConnectData(eq(TOPIC), eq(valueSchema), capturedRecord.capture())).thenReturn(serialized); when(configLog.sendWithReceipt(configKey, serialized)).thenReturn(producerFuture); when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null); - when(converter.toConnectData(TOPIC, serialized)).thenAnswer((Answer) invocation -> { - if (dataFieldName != null) - assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); + when(converter.toConnectData(TOPIC, serialized)).thenAnswer(invocation -> { + assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); // Note null schema because default settings for internal serialization are schema-less - return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue())); + return new SchemaAndValue(null, structToMap(capturedRecord.getValue())); }); } - private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) { - LinkedHashMap serializedData = new LinkedHashMap<>(); - serializedData.put(key, serializedValue); - expectRead(serializedData, Collections.singletonMap(key, deserializedValue)); - } - private void expectRead(LinkedHashMap serializedValues, Map deserializedValues) { for (Map.Entry deserializedValueEntry : deserializedValues.entrySet()) { @@ -1026,14 +1296,14 @@ public class KafkaConfigBackingStoreMockitoTest { // This map needs to maintain ordering private Answer> expectReadToEnd(final Map serializedConfigs) { return invocation -> { - TestFuture future = new TestFuture<>(); for (Map.Entry entry : serializedConfigs.entrySet()) { capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0, 0, entry.getKey(), entry.getValue(), new RecordHeaders(), Optional.empty())); } - future.resolveOnGet((Void) null); - return future; + CompletableFuture f = new CompletableFuture<>(); + f.complete(null); + return f; }; } @@ -1041,10 +1311,8 @@ public class KafkaConfigBackingStoreMockitoTest { private Map structToMap(Struct struct) { if (struct == null) return null; - - HashMap result = new HashMap<>(); - for (Field field : struct.schema().fields()) - result.put(field.name(), struct.get(field)); + Map result = new HashMap<>(); + for (Field field : struct.schema().fields()) result.put(field.name(), struct.get(field)); return result; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 637dac4d679..61e73df2706 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; @@ -591,299 +590,6 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } - @Test - public void testBackgroundConnectorDeletion() throws Exception { - // verify that we handle connector deletions correctly when they come up through the log - - expectConfigure(); - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - logOffset = 5; - - expectStart(existingRecords, deserialized); - - LinkedHashMap serializedData = new LinkedHashMap<>(); - serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - serializedData.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1)); - - Map deserializedData = new HashMap<>(); - deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null); - deserializedData.put(TARGET_STATE_KEYS.get(0), null); - - expectRead(serializedData, deserializedData); - - configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0)); - EasyMock.expectLastCall(); - - expectPartitionCount(1); - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should see a single connector with initial state paused - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 0))); - assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 1))); - assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0))); - - configStorage.refresh(0, TimeUnit.SECONDS); - configState = configStorage.snapshot(); - // Connector should now be removed from the snapshot - assertFalse(configState.contains(CONNECTOR_IDS.get(0))); - assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0))); - // Ensure that the deleted connector's deferred task updates have been cleaned up - // in order to prevent unbounded growth of the map - assertEquals(Collections.emptyMap(), configStorage.deferredTaskUpdates); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { - // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. - // We start out by loading an initial configuration where we started to write a task update, and then - // compaction cleaned up the earlier record. - - expectConfigure(); - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - // This is the record that has been compacted: - //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1)); - logOffset = 6; - expectStart(existingRecords, deserialized); - expectPartitionCount(1); - - // Successful attempt to write new task config - expectReadToEnd(new LinkedHashMap<>()); - expectConvertWriteRead( - TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), - "properties", SAMPLE_CONFIGS.get(0)); - expectReadToEnd(new LinkedHashMap<>()); - expectConvertWriteRead( - COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), - "tasks", 1); // Updated to just 1 task - // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks - configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0))); - EasyMock.expectLastCall(); - // Records to be read by consumer as it reads to the end of the log - LinkedHashMap serializedConfigs = new LinkedHashMap<>(); - serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); - expectReadToEnd(serializedConfigs); - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - // After reading the log, it should have been in an inconsistent state - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(6, configState.offset()); // Should always be next to be read, not last committed - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list - assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); - // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] - assertNull(configState.taskConfig(TASK_IDS.get(0))); - assertNull(configState.taskConfig(TASK_IDS.get(1))); - assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors()); - - // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case - // we are going to shrink the number of tasks to 1 - configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0))); - // Validate updated config - configState = configStorage.snapshot(); - // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written - // to the topic. Only the last call with 1 task config + 1 commit actually gets written. - assertEquals(8, configState.offset()); - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutRestartRequestOnlyFailed() throws Exception { - RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, false); - testPutRestartRequest(restartRequest); - } - - @Test - public void testPutRestartRequestOnlyFailedIncludingTasks() throws Exception { - RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, true); - testPutRestartRequest(restartRequest); - } - - private void testPutRestartRequest(RestartRequest restartRequest) throws Exception { - expectConfigure(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - - expectConvertWriteAndRead( - RESTART_CONNECTOR_KEYS.get(0), KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0), - ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed()); - final Capture capturedRestartRequest = EasyMock.newCapture(); - configUpdateListener.onRestartRequest(EasyMock.capture(capturedRestartRequest)); - EasyMock.expectLastCall(); - - expectPartitionCount(1); - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Writing should block until it is written and read back from Kafka - configStorage.putRestartRequest(restartRequest); - - assertEquals(restartRequest.connectorName(), capturedRestartRequest.getValue().connectorName()); - assertEquals(restartRequest.onlyFailed(), capturedRestartRequest.getValue().onlyFailed()); - assertEquals(restartRequest.includeTasks(), capturedRestartRequest.getValue().includeTasks()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testRestoreRestartRequestInconsistentState() throws Exception { - // Restoring data should notify only of the latest values after loading is complete. This also validates - // that inconsistent state doesnt prevent startup. - - expectConfigure(); - // Overwrite each type at least once to ensure we see the latest data after loading - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), RESTART_REQUEST_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), RESTART_REQUEST_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(2), RESTART_REQUEST_STRUCTS.get(2)); - deserialized.put(CONFIGS_SERIALIZED.get(3), null); - logOffset = 4; - expectStart(existingRecords, deserialized); - expectPartitionCount(1); - - // Shouldn't see any callbacks since this is during startup - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutLogLevel() throws Exception { - final String logger1 = "org.apache.zookeeper"; - final String logger2 = "org.apache.cassandra"; - final String logger3 = "org.apache.kafka.clients"; - final String logger4 = "org.apache.kafka.connect"; - final String level1 = "ERROR"; - final String level3 = "WARN"; - final String level4 = "DEBUG"; - - final Struct existingLogLevel = new Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0) - .put("level", level1); - - // Pre-populate the config topic with a couple of logger level records; these should be ignored (i.e., - // not reported to the update listener) - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger1, - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty() - ), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger2, - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty() - ) - ); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), existingLogLevel); - // Make sure we gracefully handle tombstones - deserialized.put(CONFIGS_SERIALIZED.get(1), null); - logOffset = 2; - - expectConfigure(); - expectStart(existingRecords, deserialized); - expectPartitionCount(1); - expectStop(); - - expectConvertWriteRead( - "logger-cluster-" + logger3, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2), - "level", level3); - expectConvertWriteRead( - "logger-cluster-" + logger4, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3), - "level", level4); - - LinkedHashMap newRecords = new LinkedHashMap<>(); - newRecords.put("logger-cluster-" + logger3, CONFIGS_SERIALIZED.get(2)); - newRecords.put("logger-cluster-" + logger4, CONFIGS_SERIALIZED.get(3)); - expectReadToEnd(newRecords); - - configUpdateListener.onLoggingLevelUpdate(logger3, level3); - EasyMock.expectLastCall(); - configUpdateListener.onLoggingLevelUpdate(logger4, level4); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - configStorage.putLoggerLevel(logger3, level3); - configStorage.putLoggerLevel(logger4, level4); - configStorage.refresh(0, TimeUnit.SECONDS); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), @@ -975,14 +681,6 @@ public class KafkaConfigBackingStoreTest { }); } - private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized, - final String dataFieldName, final Object dataFieldValue) throws Exception { - expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue); - LinkedHashMap recordsToRead = new LinkedHashMap<>(); - recordsToRead.put(configKey, serialized); - expectReadToEnd(recordsToRead); - } - // Manually insert a connector into config storage, updating the task configs, connector config, and root config private void whiteboxAddConnector(String connectorName, Map connectorConfig, List> taskConfigs) { Map> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");