From 41f5bf844df1ee94ab69c341d216a334a0d8c7f7 Mon Sep 17 00:00:00 2001 From: Hector Geraldino Date: Sun, 5 May 2024 20:47:17 -0400 Subject: [PATCH] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) (#15841) Reviewers: Chia-Ping Tsai --- .../KafkaConfigBackingStoreMockitoTest.java | 561 +++++++++++++++- .../storage/KafkaConfigBackingStoreTest.java | 604 ------------------ 2 files changed, 554 insertions(+), 611 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java index 37d1c55066f..c8bdea1fd8c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java @@ -20,9 +20,14 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.MockTime; @@ -31,11 +36,13 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TestFuture; import org.apache.kafka.connect.util.TopicAdmin; @@ -48,6 +55,8 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -55,6 +64,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -67,18 +77,25 @@ import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG; import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME; import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME; +import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS; import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -102,13 +119,31 @@ public class KafkaConfigBackingStoreMockitoTest { private static final List CONNECTOR_IDS = Arrays.asList("connector1", "connector2"); private static final List CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2"); + private static final List COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2"); + private static final List TARGET_STATE_KEYS = Arrays.asList("target-state-connector1", "target-state-connector2"); - - + private static final List CONNECTOR_TASK_COUNT_RECORD_KEYS = Arrays.asList("tasks-fencing-connector1", "tasks-fencing-connector2"); private static final String CONNECTOR_1_NAME = "connector1"; private static final String CONNECTOR_2_NAME = "connector2"; private static final List RESTART_CONNECTOR_KEYS = Arrays.asList(RESTART_KEY(CONNECTOR_1_NAME), RESTART_KEY(CONNECTOR_2_NAME)); + // Need a) connector with multiple tasks and b) multiple connectors + private static final List TASK_IDS = Arrays.asList( + new ConnectorTaskId("connector1", 0), + new ConnectorTaskId("connector1", 1), + new ConnectorTaskId("connector2", 0) + ); + private static final List TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0"); + // Need some placeholders -- the contents don't matter here, just that they are restored properly + private static final List> SAMPLE_CONFIGS = Arrays.asList( + Collections.singletonMap("config-key-one", "config-value-one"), + Collections.singletonMap("config-key-two", "config-value-two"), + Collections.singletonMap("config-key-three", "config-value-three") + ); + private static final List TASK_CONFIG_STRUCTS = Arrays.asList( + new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), + new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)) + ); private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false); private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true); private static final List RESTART_REQUEST_STRUCTS = Arrays.asList( @@ -116,12 +151,23 @@ public class KafkaConfigBackingStoreMockitoTest { ONLY_FAILED_MISSING_STRUCT, INCLUDE_TASKS_MISSING_STRUCT); + private static final List CONNECTOR_CONFIG_STRUCTS = Arrays.asList( + new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), + new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), + new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)) + ); - // Need some placeholders -- the contents don't matter here, just that they are restored properly - private static final List> SAMPLE_CONFIGS = Arrays.asList( - Collections.singletonMap("config-key-one", "config-value-one"), - Collections.singletonMap("config-key-two", "config-value-two"), - Collections.singletonMap("config-key-three", "config-value-three") + private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1) + .put("state", "PAUSED") + .put("state.v2", "PAUSED"); + private static final Struct TARGET_STATE_PAUSED_LEGACY = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0) + .put("state", "PAUSED"); + private static final Struct TARGET_STATE_STOPPED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1) + .put("state", "PAUSED") + .put("state.v2", "STOPPED"); + private static final List CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList( + new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6), + new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9) ); // The exact format doesn't matter here since both conversions are mocked @@ -130,6 +176,11 @@ public class KafkaConfigBackingStoreMockitoTest { "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(), "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes() ); + private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR + = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); + + private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR + = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0); private static final List TARGET_STATES_SERIALIZED = Arrays.asList( "started".getBytes(), "paused".getBytes(), "stopped".getBytes() @@ -143,6 +194,8 @@ public class KafkaConfigBackingStoreMockitoTest { @Mock KafkaBasedLog configLog; @Mock + Producer fencableProducer; + @Mock Future producerFuture; private KafkaConfigBackingStore configStorage; @@ -347,6 +400,419 @@ public class KafkaConfigBackingStoreMockitoTest { verify(configLog).stop(); } + @Test + public void testPutConnectorConfigProducerError() throws Exception { + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))) + .thenReturn(CONFIGS_SERIALIZED.get(0)); + when(configLog.sendWithReceipt(anyString(), any(byte[].class))).thenReturn(producerFuture); + + // Verify initial state + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + assertEquals(0, configState.connectors().size()); + + Exception thrownException = new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"))); + when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(thrownException); + + // verify that the producer exception from KafkaBasedLog::send is propagated + ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), + SAMPLE_CONFIGS.get(0), null)); + assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka")); + assertEquals(thrownException, e.getCause()); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testRemoveConnectorConfigSlowProducer() throws Exception { + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + @SuppressWarnings("unchecked") + Future connectorConfigProducerFuture = mock(Future.class); + + @SuppressWarnings("unchecked") + Future targetStateProducerFuture = mock(Future.class); + + when(configLog.sendWithReceipt(anyString(), isNull())) + // tombstone for the connector config + .thenReturn(connectorConfigProducerFuture) + // tombstone for the connector target state + .thenReturn(targetStateProducerFuture); + + when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS), any(TimeUnit.class))) + .thenAnswer((Answer) invocation -> { + time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000); + return null; + }); + + // the future get timeout is expected to be reduced according to how long the previous Future::get took + when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class))) + .thenAnswer((Answer) invocation -> { + time.sleep(1000); + return null; + }); + + @SuppressWarnings("unchecked") + Future future = mock(Future.class); + when(configLog.readToEnd()).thenReturn(future); + + // the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the + // timeout on the log read future to be 0 + when(future.get(eq(0L), any(TimeUnit.class))).thenReturn(null); + + configStorage.removeConnectorConfig("test-connector"); + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + @SuppressWarnings("unchecked") + public void testWritePrivileges() throws Exception { + // With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer + // to write some types of messages to the config topic + props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); + createStore(); + + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Try and fail to write a task count record to the config topic without write privileges + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))) + .thenReturn(CONFIGS_SERIALIZED.get(0)); + + // Should fail the first time since we haven't claimed write privileges + assertThrows(IllegalStateException.class, () -> configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6)); + + // Claim write privileges + doReturn(fencableProducer).when(configStorage).createFencableProducer(); + // And write the task count record successfully + when(fencableProducer.send(any(ProducerRecord.class))).thenReturn(null); + doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))) + .doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)))) + .when(configLog).readToEnd(); + expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)); + + // Should succeed now + configStorage.claimWritePrivileges(); + configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6); + + verify(fencableProducer).beginTransaction(); + verify(fencableProducer).commitTransaction(); + + // Try to write a connector config + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))) + .thenReturn(CONFIGS_SERIALIZED.get(1)); + // Get fenced out + doThrow(new ProducerFencedException("Better luck next time")) + .doNothing() + .when(fencableProducer).commitTransaction(); + + // Should fail again when we get fenced out + assertThrows(PrivilegedWriteException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null)); + + verify(fencableProducer, times(2)).beginTransaction(); + verify(fencableProducer).close(Duration.ZERO); + + // Should fail if we retry without reclaiming write privileges + assertThrows(IllegalStateException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null)); + + // In the meantime, write a target state (which doesn't require write privileges) + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED)) + .thenReturn(CONFIGS_SERIALIZED.get(1)); + when(configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1))) + .thenReturn(producerFuture); + when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null); + + // Should succeed even without write privileges (target states can be written by anyone) + configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED); + + // Reclaim write privileges and successfully write the config + when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2))) + .thenReturn(new SchemaAndValue(null, structToMap(CONNECTOR_CONFIG_STRUCTS.get(0)))); + + // Should succeed if we re-claim write privileges + configStorage.claimWritePrivileges(); + configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null); + + verify(fencableProducer, times(3)).beginTransaction(); + verify(fencableProducer, times(3)).commitTransaction(); + verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1)); + + configStorage.stop(); + verify(configLog).stop(); + verify(configStorage, times(2)).createFencableProducer(); + verify(fencableProducer, times(2)).close(Duration.ZERO); + } + + @Test + public void testRestoreTargetStateUnexpectedDeletion() { + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), null); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // The target state deletion should reset the state to STARTED + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testRestoreTargetState() { + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1), + CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + // A worker running an older version wrote this target state; make sure we can handle it correctly + deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED_LEGACY); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED); + logOffset = 6; + + expectStart(existingRecords, deserialized); + + // Shouldn't see any callbacks since this is during startup + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Should see a single connector with initial state paused + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0))); + assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1))); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testRestore() { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state is ignored. + + // Overwrite each type at least once to ensure we see the latest data after loading + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1), + CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), + // Connector after root update should make it through, task update shouldn't + new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 8, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(8), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(4), CONNECTOR_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(6), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(7), CONNECTOR_CONFIG_STRUCTS.get(2)); + deserialized.put(CONFIGS_SERIALIZED.get(8), TASK_CONFIG_STRUCTS.get(1)); + logOffset = 9; + + expectStart(existingRecords, deserialized); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Should see a single connector and its config should be the last one seen anywhere in the log + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(logOffset, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] + assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); + // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected + assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0))); + // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1))); + assertEquals(9, (int) configState.taskCountRecord(CONNECTOR_IDS.get(1))); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + assertEquals(Collections.singleton("connector1"), configState.connectorsPendingFencing); + + // Shouldn't see any callbacks since this is during startup + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testRestoreConnectorDeletion() { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state is ignored. + + // Overwrite each type at least once to ensure we see the latest data after loading + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), + CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty())); + + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), null); + deserialized.put(CONFIGS_SERIALIZED.get(4), null); + deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + + logOffset = 6; + expectStart(existingRecords, deserialized); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Should see a single connector and its config should be the last one seen anywhere in the log + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted + assertTrue(configState.connectors().isEmpty()); + + // Shouldn't see any callbacks since this is during startup + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testRestoreZeroTasks() { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state is ignored. + + // Overwrite each type at least once to ensure we see the latest data after loading + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), + // Connector after root update should make it through, task update shouldn't + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty())); + LinkedHashMap deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2)); + deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR); + logOffset = 8; + expectStart(existingRecords, deserialized); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Should see a single connector and its config should be the last one seen anywhere in the log + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] + assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); + // Should see 0 tasks for that connector. + assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); + // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + // Shouldn't see any callbacks since this is during startup + configStorage.stop(); + verify(configLog).stop(); + } + @Test public void testRecordToRestartRequest() { ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), @@ -429,6 +895,72 @@ public class KafkaConfigBackingStoreMockitoTest { assertEquals(expectedClientId + "-leader", fencableProducerProps.get(CLIENT_ID_CONFIG)); } + @Test + public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() { + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(2); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start()); + assertTrue(e.getMessage().contains("required to have a single partition")); + } + + @Test + public void testFencableProducerPropertiesInsertedByDefault() { + props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); + String groupId = "my-connect-cluster"; + props.put(GROUP_ID_CONFIG, groupId); + props.remove(TRANSACTIONAL_ID_CONFIG); + props.remove(ENABLE_IDEMPOTENCE_CONFIG); + createStore(); + + Map fencableProducerProperties = configStorage.fencableProducerProps(config); + assertEquals("connect-cluster-" + groupId, fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG)); + assertEquals("true", fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG)); + } + + @Test + public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() { + props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + props.remove(ISOLATION_LEVEL_CONFIG); + createStore(); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + + assertEquals( + IsolationLevel.READ_COMMITTED.toString(), + capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG) + ); + } + + @Test + public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() { + props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString()); + createStore(); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + + assertEquals( + IsolationLevel.READ_COMMITTED.toString(), + capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG) + ); + } + + @Test + public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() { + props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); + props.remove(ISOLATION_LEVEL_CONFIG); + createStore(); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + + assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)); + } + private void verifyConfigure() { verify(configStorage).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(), capturedConsumerProps.capture(), capturedConsumedCallback.capture(), @@ -476,6 +1008,21 @@ public class KafkaConfigBackingStoreMockitoTest { }); } + private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) { + LinkedHashMap serializedData = new LinkedHashMap<>(); + serializedData.put(key, serializedValue); + expectRead(serializedData, Collections.singletonMap(key, deserializedValue)); + } + + private void expectRead(LinkedHashMap serializedValues, + Map deserializedValues) { + for (Map.Entry deserializedValueEntry : deserializedValues.entrySet()) { + byte[] serializedValue = serializedValues.get(deserializedValueEntry.getKey()); + when(converter.toConnectData(TOPIC, serializedValue)) + .thenReturn(new SchemaAndValue(null, structToMap(deserializedValueEntry.getValue()))); + } + } + // This map needs to maintain ordering private Answer> expectReadToEnd(final Map serializedConfigs) { return invocation -> { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index bca2a73f1c7..637dac4d679 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -19,12 +19,7 @@ package org.apache.kafka.connect.storage; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.IsolationLevel; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.ProducerFencedException; -import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.MockTime; @@ -33,7 +28,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -55,7 +49,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -65,24 +58,16 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; -import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG; -import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG; import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME; import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME; -import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS; import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @@ -138,13 +123,7 @@ public class KafkaConfigBackingStoreTest { new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)) ); - private static final List CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList( - new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6), - new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9) - ); private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED"); - private static final Struct TARGET_STATE_PAUSED_LEGACY = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0) - .put("state", "PAUSED"); private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1) .put("state", "PAUSED") .put("state.v2", "PAUSED"); @@ -155,9 +134,6 @@ public class KafkaConfigBackingStoreTest { private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); - private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR - = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0); - private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false); private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true); private static final List RESTART_REQUEST_STRUCTS = Arrays.asList( @@ -172,10 +148,6 @@ public class KafkaConfigBackingStoreTest { "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes() ); - private static final List TARGET_STATES_SERIALIZED = Arrays.asList( - "started".getBytes(), "paused".getBytes(), "stopped".getBytes() - ); - @Mock private Converter converter; @Mock @@ -185,8 +157,6 @@ public class KafkaConfigBackingStoreTest { @Mock KafkaBasedLog storeLog; @Mock - Producer fencableProducer; - @Mock Future producerFuture; private KafkaConfigBackingStore configStorage; @@ -226,182 +196,6 @@ public class KafkaConfigBackingStoreTest { createStore(); } - @Test - public void testPutConnectorConfigProducerError() throws Exception { - expectConfigure(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); - - expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0)); - - storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().andReturn(producerFuture); - - producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject()); - EasyMock.expectLastCall().andThrow(new ExecutionException(new TopicAuthorizationException(Collections.singleton("test")))); - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Verify initial state - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(-1, configState.offset()); - assertEquals(0, configState.connectors().size()); - - // verify that the producer exception from KafkaBasedLog::send is propagated - ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), - SAMPLE_CONFIGS.get(0), null)); - assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka")); - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testRemoveConnectorConfigSlowProducer() throws Exception { - expectConfigure(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - expectPartitionCount(1); - - @SuppressWarnings("unchecked") - Future connectorConfigProducerFuture = PowerMock.createMock(Future.class); - // tombstone for the connector config - storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull()); - EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture); - - @SuppressWarnings("unchecked") - Future targetStateProducerFuture = PowerMock.createMock(Future.class); - // tombstone for the connector target state - storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull()); - EasyMock.expectLastCall().andReturn(targetStateProducerFuture); - - connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject()); - EasyMock.expectLastCall().andAnswer(() -> { - time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000); - return null; - }); - - // the future get timeout is expected to be reduced according to how long the previous Future::get took - targetStateProducerFuture.get(EasyMock.eq(1000L), EasyMock.anyObject()); - EasyMock.expectLastCall().andAnswer(() -> { - time.sleep(1000); - return null; - }); - - @SuppressWarnings("unchecked") - Future future = PowerMock.createMock(Future.class); - EasyMock.expect(storeLog.readToEnd()).andAnswer(() -> future); - - // the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the - // timeout on the log read future to be 0 - EasyMock.expect(future.get(EasyMock.eq(0L), EasyMock.anyObject())).andReturn(null); - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - configStorage.removeConnectorConfig("test-connector"); - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testWritePrivileges() throws Exception { - // With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer - // to write some types of messages to the config topic - props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); - createStore(); - - expectConfigure(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - - // Try and fail to write a task count record to the config topic without write privileges - expectConvert(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0)); - // Claim write privileges - expectFencableProducer(); - // And write the task count record successfully - expectConvert(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0)); - fencableProducer.beginTransaction(); - EasyMock.expectLastCall(); - EasyMock.expect(fencableProducer.send(EasyMock.anyObject())).andReturn(null); - fencableProducer.commitTransaction(); - EasyMock.expectLastCall(); - expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)); - - // Try to write a connector config - expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1)); - fencableProducer.beginTransaction(); - EasyMock.expectLastCall(); - EasyMock.expect(fencableProducer.send(EasyMock.anyObject())).andReturn(null); - // Get fenced out - fencableProducer.commitTransaction(); - EasyMock.expectLastCall().andThrow(new ProducerFencedException("Better luck next time")); - fencableProducer.close(Duration.ZERO); - EasyMock.expectLastCall(); - // And fail when trying to write again without reclaiming write privileges - expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1)); - - // In the meantime, write a target state (which doesn't require write privileges) - expectConvert(KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1)); - storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1)); - EasyMock.expectLastCall().andReturn(producerFuture); - producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject()); - EasyMock.expectLastCall().andReturn(null); - - // Reclaim write privileges - expectFencableProducer(); - // And successfully write the config - expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1)); - fencableProducer.beginTransaction(); - EasyMock.expectLastCall(); - EasyMock.expect(fencableProducer.send(EasyMock.anyObject())).andReturn(null); - fencableProducer.commitTransaction(); - EasyMock.expectLastCall(); - expectConvertRead(CONNECTOR_CONFIG_KEYS.get(1), CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(2)); - configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(1)); - EasyMock.expectLastCall(); - - expectPartitionCount(1); - expectStop(); - fencableProducer.close(Duration.ZERO); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should fail the first time since we haven't claimed write privileges - assertThrows(IllegalStateException.class, () -> configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6)); - // Should succeed now - configStorage.claimWritePrivileges(); - configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6); - - // Should fail again when we get fenced out - assertThrows(PrivilegedWriteException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null)); - // Should fail if we retry without reclaiming write privileges - assertThrows(IllegalStateException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null)); - - // Should succeed even without write privileges (target states can be written by anyone) - configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED); - - // Should succeed if we re-claim write privileges - configStorage.claimWritePrivileges(); - configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - @Test public void testTaskCountRecordsAndGenerations() throws Exception { expectConfigure(); @@ -688,56 +482,6 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } - @Test - public void testRestoreTargetState() throws Exception { - expectConfigure(); - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1), - CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - // A worker running an older version wrote this target state; make sure we can handle it correctly - deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED_LEGACY); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED); - logOffset = 6; - - expectStart(existingRecords, deserialized); - - // Shouldn't see any callbacks since this is during startup - - expectPartitionCount(1); - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should see a single connector with initial state paused - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0))); - assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1))); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - @Test public void testBackgroundUpdateTargetState() throws Exception { // verify that we handle target state changes correctly when they come up through the log @@ -913,236 +657,6 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } - @Test - public void testRestoreTargetStateUnexpectedDeletion() throws Exception { - expectConfigure(); - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(3), null); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - logOffset = 5; - - expectStart(existingRecords, deserialized); - expectPartitionCount(1); - - // Shouldn't see any callbacks since this is during startup - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // The target state deletion should reset the state to STARTED - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testRestore() throws Exception { - // Restoring data should notify only of the latest values after loading is complete. This also validates - // that inconsistent state is ignored. - - expectConfigure(); - // Overwrite each type at least once to ensure we see the latest data after loading - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1), - CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), - // Connector after root update should make it through, task update shouldn't - new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 8, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(8), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(3), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(4), CONNECTOR_CONFIG_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(6), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(7), CONNECTOR_CONFIG_STRUCTS.get(2)); - deserialized.put(CONFIGS_SERIALIZED.get(8), TASK_CONFIG_STRUCTS.get(1)); - logOffset = 9; - expectStart(existingRecords, deserialized); - expectPartitionCount(1); - - // Shouldn't see any callbacks since this is during startup - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should see a single connector and its config should be the last one seen anywhere in the log - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(logOffset, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); - // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] - assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); - // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected - assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0))); - // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1))); - assertEquals(9, (int) configState.taskCountRecord(CONNECTOR_IDS.get(1))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - assertEquals(Collections.singleton("connector1"), configState.connectorsPendingFencing); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testRestoreConnectorDeletion() throws Exception { - // Restoring data should notify only of the latest values after loading is complete. This also validates - // that inconsistent state is ignored. - - expectConfigure(); - // Overwrite each type at least once to ensure we see the latest data after loading - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), - CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty())); - - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(3), null); - deserialized.put(CONFIGS_SERIALIZED.get(4), null); - deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - - logOffset = 6; - expectStart(existingRecords, deserialized); - expectPartitionCount(1); - - // Shouldn't see any callbacks since this is during startup - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should see a single connector and its config should be the last one seen anywhere in the log - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted - assertTrue(configState.connectors().isEmpty()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testRestoreZeroTasks() throws Exception { - // Restoring data should notify only of the latest values after loading is complete. This also validates - // that inconsistent state is ignored. - expectConfigure(); - // Overwrite each type at least once to ensure we see the latest data after loading - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), - CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), - // Connector after root update should make it through, task update shouldn't - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), - CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty())); - LinkedHashMap deserialized = new LinkedHashMap<>(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2)); - deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR); - logOffset = 8; - expectStart(existingRecords, deserialized); - expectPartitionCount(1); - - // Shouldn't see any callbacks since this is during startup - - expectStop(); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - configStorage.start(); - - // Should see a single connector and its config should be the last one seen anywhere in the log - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] - assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); - // Should see 0 tasks for that connector. - assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); - // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - @Test public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. @@ -1370,94 +884,6 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } - @Test - public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception { - expectConfigure(); - expectStart(Collections.emptyList(), Collections.emptyMap()); - - expectPartitionCount(2); - - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start()); - assertTrue(e.getMessage().contains("required to have a single partition")); - - PowerMock.verifyAll(); - } - - @Test - public void testFencableProducerPropertiesInsertedByDefault() throws Exception { - props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); - String groupId = "my-connect-cluster"; - props.put(GROUP_ID_CONFIG, groupId); - props.remove(TRANSACTIONAL_ID_CONFIG); - props.remove(ENABLE_IDEMPOTENCE_CONFIG); - createStore(); - - PowerMock.replayAll(); - - Map fencableProducerProperties = configStorage.fencableProducerProps(config); - assertEquals("connect-cluster-" + groupId, fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG)); - assertEquals("true", fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG)); - - PowerMock.verifyAll(); - } - - @Test - public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() throws Exception { - props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); - props.remove(ISOLATION_LEVEL_CONFIG); - createStore(); - - expectConfigure(); - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - - assertEquals( - IsolationLevel.READ_COMMITTED.toString(), - capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG) - ); - - PowerMock.verifyAll(); - } - - @Test - public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() throws Exception { - props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); - props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString()); - createStore(); - - expectConfigure(); - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - - assertEquals( - IsolationLevel.READ_COMMITTED.toString(), - capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG) - ); - - PowerMock.verifyAll(); - } - - @Test - public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() throws Exception { - props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); - props.remove(ISOLATION_LEVEL_CONFIG); - createStore(); - - expectConfigure(); - PowerMock.replayAll(); - - configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); - - assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)); - - PowerMock.verifyAll(); - } - private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), @@ -1467,13 +893,6 @@ public class KafkaConfigBackingStoreTest { .andReturn(storeLog); } - private void expectFencableProducer() throws Exception { - fencableProducer.initTransactions(); - EasyMock.expectLastCall(); - PowerMock.expectPrivate(configStorage, "createFencableProducer") - .andReturn(fencableProducer); - } - private void expectPartitionCount(int partitionCount) { EasyMock.expect(storeLog.partitionCount()) .andReturn(partitionCount); @@ -1516,11 +935,6 @@ public class KafkaConfigBackingStoreTest { expectRead(serializedData, Collections.singletonMap(key, deserializedValue)); } - private void expectConvert(Schema valueSchema, Struct valueStruct, byte[] serialized) { - EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.eq(valueStruct))) - .andReturn(serialized); - } - // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back // from the log. Validate the data that is captured when the conversion is performed matches the specified data // (by checking a single field's value) @@ -1546,14 +960,6 @@ public class KafkaConfigBackingStoreTest { }); } - private void expectConvertRead(final String configKey, final Struct struct, final byte[] serialized) { - EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized))) - .andAnswer(() -> new SchemaAndValue(null, serialized == null ? null : structToMap(struct))); - LinkedHashMap recordsToRead = new LinkedHashMap<>(); - recordsToRead.put(configKey, serialized); - expectReadToEnd(recordsToRead); - } - // This map needs to maintain ordering private void expectReadToEnd(final LinkedHashMap serializedConfigs) { EasyMock.expect(storeLog.readToEnd()) @@ -1569,16 +975,6 @@ public class KafkaConfigBackingStoreTest { }); } - private void expectConnectorRemoval(String configKey, String targetStateKey) throws Exception { - expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null); - expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null); - - LinkedHashMap recordsToRead = new LinkedHashMap<>(); - recordsToRead.put(configKey, null); - recordsToRead.put(targetStateKey, null); - expectReadToEnd(recordsToRead); - } - private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized, final String dataFieldName, final Object dataFieldValue) throws Exception { expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);