KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) (#15841)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Hector Geraldino 2024-05-05 20:47:17 -04:00 committed by GitHub
parent 970ac07881
commit 41f5bf844d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 554 additions and 611 deletions

View File

@ -20,9 +20,14 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
@ -31,11 +36,13 @@ import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
import org.apache.kafka.connect.util.TopicAdmin;
@ -48,6 +55,8 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -55,6 +64,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -67,18 +77,25 @@ import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -102,13 +119,31 @@ public class KafkaConfigBackingStoreMockitoTest {
private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
private static final List<String> TARGET_STATE_KEYS = Arrays.asList("target-state-connector1", "target-state-connector2");
private static final List<String> CONNECTOR_TASK_COUNT_RECORD_KEYS = Arrays.asList("tasks-fencing-connector1", "tasks-fencing-connector2");
private static final String CONNECTOR_1_NAME = "connector1";
private static final String CONNECTOR_2_NAME = "connector2";
private static final List<String> RESTART_CONNECTOR_KEYS = Arrays.asList(RESTART_KEY(CONNECTOR_1_NAME), RESTART_KEY(CONNECTOR_2_NAME));
// Need a) connector with multiple tasks and b) multiple connectors
private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
new ConnectorTaskId("connector1", 0),
new ConnectorTaskId("connector1", 1),
new ConnectorTaskId("connector2", 0)
);
private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
// Need some placeholders -- the contents don't matter here, just that they are restored properly
private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
Collections.singletonMap("config-key-one", "config-value-one"),
Collections.singletonMap("config-key-two", "config-value-two"),
Collections.singletonMap("config-key-three", "config-value-three")
);
private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
);
private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false);
private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true);
private static final List<Struct> RESTART_REQUEST_STRUCTS = Arrays.asList(
@ -116,12 +151,23 @@ public class KafkaConfigBackingStoreMockitoTest {
ONLY_FAILED_MISSING_STRUCT,
INCLUDE_TASKS_MISSING_STRUCT);
private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
);
// Need some placeholders -- the contents don't matter here, just that they are restored properly
private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
Collections.singletonMap("config-key-one", "config-value-one"),
Collections.singletonMap("config-key-two", "config-value-two"),
Collections.singletonMap("config-key-three", "config-value-three")
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "PAUSED");
private static final Struct TARGET_STATE_PAUSED_LEGACY = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0)
.put("state", "PAUSED");
private static final Struct TARGET_STATE_STOPPED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "STOPPED");
private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
);
// The exact format doesn't matter here since both conversions are mocked
@ -130,6 +176,11 @@ public class KafkaConfigBackingStoreMockitoTest {
"config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
"config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
);
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);
private static final List<byte[]> TARGET_STATES_SERIALIZED = Arrays.asList(
"started".getBytes(), "paused".getBytes(), "stopped".getBytes()
@ -143,6 +194,8 @@ public class KafkaConfigBackingStoreMockitoTest {
@Mock
KafkaBasedLog<String, byte[]> configLog;
@Mock
Producer<String, byte[]> fencableProducer;
@Mock
Future<RecordMetadata> producerFuture;
private KafkaConfigBackingStore configStorage;
@ -347,6 +400,419 @@ public class KafkaConfigBackingStoreMockitoTest {
verify(configLog).stop();
}
@Test
public void testPutConnectorConfigProducerError() throws Exception {
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0)))
.thenReturn(CONFIGS_SERIALIZED.get(0));
when(configLog.sendWithReceipt(anyString(), any(byte[].class))).thenReturn(producerFuture);
// Verify initial state
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
assertEquals(0, configState.connectors().size());
Exception thrownException = new ExecutionException(new TopicAuthorizationException(Collections.singleton("test")));
when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(thrownException);
// verify that the producer exception from KafkaBasedLog::send is propagated
ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null));
assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka"));
assertEquals(thrownException, e.getCause());
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testRemoveConnectorConfigSlowProducer() throws Exception {
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
@SuppressWarnings("unchecked")
Future<RecordMetadata> connectorConfigProducerFuture = mock(Future.class);
@SuppressWarnings("unchecked")
Future<RecordMetadata> targetStateProducerFuture = mock(Future.class);
when(configLog.sendWithReceipt(anyString(), isNull()))
// tombstone for the connector config
.thenReturn(connectorConfigProducerFuture)
// tombstone for the connector target state
.thenReturn(targetStateProducerFuture);
when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS), any(TimeUnit.class)))
.thenAnswer((Answer<RecordMetadata>) invocation -> {
time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
return null;
});
// the future get timeout is expected to be reduced according to how long the previous Future::get took
when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class)))
.thenAnswer((Answer<RecordMetadata>) invocation -> {
time.sleep(1000);
return null;
});
@SuppressWarnings("unchecked")
Future<Void> future = mock(Future.class);
when(configLog.readToEnd()).thenReturn(future);
// the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the
// timeout on the log read future to be 0
when(future.get(eq(0L), any(TimeUnit.class))).thenReturn(null);
configStorage.removeConnectorConfig("test-connector");
configStorage.stop();
verify(configLog).stop();
}
@Test
@SuppressWarnings("unchecked")
public void testWritePrivileges() throws Exception {
// With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer
// to write some types of messages to the config topic
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
createStore();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// Try and fail to write a task count record to the config topic without write privileges
when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)))
.thenReturn(CONFIGS_SERIALIZED.get(0));
// Should fail the first time since we haven't claimed write privileges
assertThrows(IllegalStateException.class, () -> configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6));
// Claim write privileges
doReturn(fencableProducer).when(configStorage).createFencableProducer();
// And write the task count record successfully
when(fencableProducer.send(any(ProducerRecord.class))).thenReturn(null);
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0))))
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2))))
.when(configLog).readToEnd();
expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
// Should succeed now
configStorage.claimWritePrivileges();
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
verify(fencableProducer).beginTransaction();
verify(fencableProducer).commitTransaction();
// Try to write a connector config
when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0)))
.thenReturn(CONFIGS_SERIALIZED.get(1));
// Get fenced out
doThrow(new ProducerFencedException("Better luck next time"))
.doNothing()
.when(fencableProducer).commitTransaction();
// Should fail again when we get fenced out
assertThrows(PrivilegedWriteException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null));
verify(fencableProducer, times(2)).beginTransaction();
verify(fencableProducer).close(Duration.ZERO);
// Should fail if we retry without reclaiming write privileges
assertThrows(IllegalStateException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null));
// In the meantime, write a target state (which doesn't require write privileges)
when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED))
.thenReturn(CONFIGS_SERIALIZED.get(1));
when(configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1)))
.thenReturn(producerFuture);
when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null);
// Should succeed even without write privileges (target states can be written by anyone)
configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
// Reclaim write privileges and successfully write the config
when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2)))
.thenReturn(new SchemaAndValue(null, structToMap(CONNECTOR_CONFIG_STRUCTS.get(0))));
// Should succeed if we re-claim write privileges
configStorage.claimWritePrivileges();
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null);
verify(fencableProducer, times(3)).beginTransaction();
verify(fencableProducer, times(3)).commitTransaction();
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
configStorage.stop();
verify(configLog).stop();
verify(configStorage, times(2)).createFencableProducer();
verify(fencableProducer, times(2)).close(Duration.ZERO);
}
@Test
public void testRestoreTargetStateUnexpectedDeletion() {
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), null);
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// The target state deletion should reset the state to STARTED
ClusterConfigState configState = configStorage.snapshot();
assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testRestoreTargetState() {
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
// A worker running an older version wrote this target state; make sure we can handle it correctly
deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED_LEGACY);
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
logOffset = 6;
expectStart(existingRecords, deserialized);
// Shouldn't see any callbacks since this is during startup
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testRestore() {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state is ignored.
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1),
CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()),
// Connector after root update should make it through, task update shouldn't
new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 8, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(8), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(4), CONNECTOR_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(6), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(7), CONNECTOR_CONFIG_STRUCTS.get(2));
deserialized.put(CONFIGS_SERIALIZED.get(8), TASK_CONFIG_STRUCTS.get(1));
logOffset = 9;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot();
assertEquals(logOffset, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
// CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
// Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
assertEquals(9, (int) configState.taskCountRecord(CONNECTOR_IDS.get(1)));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
assertEquals(Collections.singleton("connector1"), configState.connectorsPendingFencing);
// Shouldn't see any callbacks since this is during startup
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testRestoreConnectorDeletion() {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state is ignored.
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), null);
deserialized.put(CONFIGS_SERIALIZED.get(4), null);
deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 6;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted
assertTrue(configState.connectors().isEmpty());
// Shouldn't see any callbacks since this is during startup
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testRestoreZeroTasks() {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state is ignored.
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
// Connector after root update should make it through, task update shouldn't
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
logOffset = 8;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot();
assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
// CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
// Should see 0 tasks for that connector.
assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
// Shouldn't see any callbacks since this is during startup
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testRecordToRestartRequest() {
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
@ -429,6 +895,72 @@ public class KafkaConfigBackingStoreMockitoTest {
assertEquals(expectedClientId + "-leader", fencableProducerProps.get(CLIENT_ID_CONFIG));
}
@Test
public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() {
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(2);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
assertTrue(e.getMessage().contains("required to have a single partition"));
}
@Test
public void testFencableProducerPropertiesInsertedByDefault() {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
String groupId = "my-connect-cluster";
props.put(GROUP_ID_CONFIG, groupId);
props.remove(TRANSACTIONAL_ID_CONFIG);
props.remove(ENABLE_IDEMPOTENCE_CONFIG);
createStore();
Map<String, Object> fencableProducerProperties = configStorage.fencableProducerProps(config);
assertEquals("connect-cluster-" + groupId, fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
assertEquals("true", fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
}
@Test
public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
props.remove(ISOLATION_LEVEL_CONFIG);
createStore();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
assertEquals(
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
}
@Test
public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
createStore();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
assertEquals(
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
}
@Test
public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
props.remove(ISOLATION_LEVEL_CONFIG);
createStore();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG));
}
private void verifyConfigure() {
verify(configStorage).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(),
capturedConsumerProps.capture(), capturedConsumedCallback.capture(),
@ -476,6 +1008,21 @@ public class KafkaConfigBackingStoreMockitoTest {
});
}
private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) {
LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
serializedData.put(key, serializedValue);
expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
}
private void expectRead(LinkedHashMap<String, byte[]> serializedValues,
Map<String, Struct> deserializedValues) {
for (Map.Entry<String, Struct> deserializedValueEntry : deserializedValues.entrySet()) {
byte[] serializedValue = serializedValues.get(deserializedValueEntry.getKey());
when(converter.toConnectData(TOPIC, serializedValue))
.thenReturn(new SchemaAndValue(null, structToMap(deserializedValueEntry.getValue())));
}
}
// This map needs to maintain ordering
private Answer<Future<Void>> expectReadToEnd(final Map<String, byte[]> serializedConfigs) {
return invocation -> {

View File

@ -19,12 +19,7 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
@ -33,7 +28,6 @@ import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
@ -55,7 +49,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -65,24 +58,16 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@ -138,13 +123,7 @@ public class KafkaConfigBackingStoreTest {
new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
);
private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
);
private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TARGET_STATE_PAUSED_LEGACY = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0)
.put("state", "PAUSED");
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "PAUSED");
@ -155,9 +134,6 @@ public class KafkaConfigBackingStoreTest {
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);
private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false);
private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true);
private static final List<Struct> RESTART_REQUEST_STRUCTS = Arrays.asList(
@ -172,10 +148,6 @@ public class KafkaConfigBackingStoreTest {
"config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
);
private static final List<byte[]> TARGET_STATES_SERIALIZED = Arrays.asList(
"started".getBytes(), "paused".getBytes(), "stopped".getBytes()
);
@Mock
private Converter converter;
@Mock
@ -185,8 +157,6 @@ public class KafkaConfigBackingStoreTest {
@Mock
KafkaBasedLog<String, byte[]> storeLog;
@Mock
Producer<String, byte[]> fencableProducer;
@Mock
Future<RecordMetadata> producerFuture;
private KafkaConfigBackingStore configStorage;
@ -226,182 +196,6 @@ public class KafkaConfigBackingStoreTest {
createStore();
}
@Test
public void testPutConnectorConfigProducerError() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"))));
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Verify initial state
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
assertEquals(0, configState.connectors().size());
// verify that the producer exception from KafkaBasedLog::send is propagated
ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null));
assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka"));
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testRemoveConnectorConfigSlowProducer() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
@SuppressWarnings("unchecked")
Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
// tombstone for the connector config
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
@SuppressWarnings("unchecked")
Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
// tombstone for the connector target state
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
return null;
});
// the future get timeout is expected to be reduced according to how long the previous Future::get took
targetStateProducerFuture.get(EasyMock.eq(1000L), EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
time.sleep(1000);
return null;
});
@SuppressWarnings("unchecked")
Future<Void> future = PowerMock.createMock(Future.class);
EasyMock.expect(storeLog.readToEnd()).andAnswer(() -> future);
// the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the
// timeout on the log read future to be 0
EasyMock.expect(future.get(EasyMock.eq(0L), EasyMock.anyObject())).andReturn(null);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
configStorage.removeConnectorConfig("test-connector");
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testWritePrivileges() throws Exception {
// With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer
// to write some types of messages to the config topic
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
createStore();
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
// Try and fail to write a task count record to the config topic without write privileges
expectConvert(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
// Claim write privileges
expectFencableProducer();
// And write the task count record successfully
expectConvert(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
fencableProducer.beginTransaction();
EasyMock.expectLastCall();
EasyMock.expect(fencableProducer.send(EasyMock.anyObject())).andReturn(null);
fencableProducer.commitTransaction();
EasyMock.expectLastCall();
expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
// Try to write a connector config
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
fencableProducer.beginTransaction();
EasyMock.expectLastCall();
EasyMock.expect(fencableProducer.send(EasyMock.anyObject())).andReturn(null);
// Get fenced out
fencableProducer.commitTransaction();
EasyMock.expectLastCall().andThrow(new ProducerFencedException("Better luck next time"));
fencableProducer.close(Duration.ZERO);
EasyMock.expectLastCall();
// And fail when trying to write again without reclaiming write privileges
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
// In the meantime, write a target state (which doesn't require write privileges)
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(null);
// Reclaim write privileges
expectFencableProducer();
// And successfully write the config
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
fencableProducer.beginTransaction();
EasyMock.expectLastCall();
EasyMock.expect(fencableProducer.send(EasyMock.anyObject())).andReturn(null);
fencableProducer.commitTransaction();
EasyMock.expectLastCall();
expectConvertRead(CONNECTOR_CONFIG_KEYS.get(1), CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(2));
configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
EasyMock.expectLastCall();
expectPartitionCount(1);
expectStop();
fencableProducer.close(Duration.ZERO);
EasyMock.expectLastCall();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should fail the first time since we haven't claimed write privileges
assertThrows(IllegalStateException.class, () -> configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6));
// Should succeed now
configStorage.claimWritePrivileges();
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
// Should fail again when we get fenced out
assertThrows(PrivilegedWriteException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null));
// Should fail if we retry without reclaiming write privileges
assertThrows(IllegalStateException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null));
// Should succeed even without write privileges (target states can be written by anyone)
configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
// Should succeed if we re-claim write privileges
configStorage.claimWritePrivileges();
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null);
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testTaskCountRecordsAndGenerations() throws Exception {
expectConfigure();
@ -688,56 +482,6 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testRestoreTargetState() throws Exception {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
// A worker running an older version wrote this target state; make sure we can handle it correctly
deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED_LEGACY);
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
logOffset = 6;
expectStart(existingRecords, deserialized);
// Shouldn't see any callbacks since this is during startup
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testBackgroundUpdateTargetState() throws Exception {
// verify that we handle target state changes correctly when they come up through the log
@ -913,236 +657,6 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testRestoreTargetStateUnexpectedDeletion() throws Exception {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), null);
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// The target state deletion should reset the state to STARTED
ClusterConfigState configState = configStorage.snapshot();
assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testRestore() throws Exception {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state is ignored.
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1),
CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()),
// Connector after root update should make it through, task update shouldn't
new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 8, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(8), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(4), CONNECTOR_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(6), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(7), CONNECTOR_CONFIG_STRUCTS.get(2));
deserialized.put(CONFIGS_SERIALIZED.get(8), TASK_CONFIG_STRUCTS.get(1));
logOffset = 9;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot();
assertEquals(logOffset, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
// CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
// Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
assertEquals(9, (int) configState.taskCountRecord(CONNECTOR_IDS.get(1)));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
assertEquals(Collections.singleton("connector1"), configState.connectorsPendingFencing);
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testRestoreConnectorDeletion() throws Exception {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state is ignored.
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), null);
deserialized.put(CONFIGS_SERIALIZED.get(4), null);
deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 6;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted
assertTrue(configState.connectors().isEmpty());
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testRestoreZeroTasks() throws Exception {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state is ignored.
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
// Connector after root update should make it through, task update shouldn't
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
logOffset = 8;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot();
assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
// CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
// Should see 0 tasks for that connector.
assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
// Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
@ -1370,94 +884,6 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(2);
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
assertTrue(e.getMessage().contains("required to have a single partition"));
PowerMock.verifyAll();
}
@Test
public void testFencableProducerPropertiesInsertedByDefault() throws Exception {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
String groupId = "my-connect-cluster";
props.put(GROUP_ID_CONFIG, groupId);
props.remove(TRANSACTIONAL_ID_CONFIG);
props.remove(ENABLE_IDEMPOTENCE_CONFIG);
createStore();
PowerMock.replayAll();
Map<String, Object> fencableProducerProperties = configStorage.fencableProducerProps(config);
assertEquals("connect-cluster-" + groupId, fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
assertEquals("true", fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
PowerMock.verifyAll();
}
@Test
public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() throws Exception {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
props.remove(ISOLATION_LEVEL_CONFIG);
createStore();
expectConfigure();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
assertEquals(
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
PowerMock.verifyAll();
}
@Test
public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() throws Exception {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
createStore();
expectConfigure();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
assertEquals(
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
PowerMock.verifyAll();
}
@Test
public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() throws Exception {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
props.remove(ISOLATION_LEVEL_CONFIG);
createStore();
expectConfigure();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG));
PowerMock.verifyAll();
}
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
@ -1467,13 +893,6 @@ public class KafkaConfigBackingStoreTest {
.andReturn(storeLog);
}
private void expectFencableProducer() throws Exception {
fencableProducer.initTransactions();
EasyMock.expectLastCall();
PowerMock.expectPrivate(configStorage, "createFencableProducer")
.andReturn(fencableProducer);
}
private void expectPartitionCount(int partitionCount) {
EasyMock.expect(storeLog.partitionCount())
.andReturn(partitionCount);
@ -1516,11 +935,6 @@ public class KafkaConfigBackingStoreTest {
expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
}
private void expectConvert(Schema valueSchema, Struct valueStruct, byte[] serialized) {
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.eq(valueStruct)))
.andReturn(serialized);
}
// Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
// from the log. Validate the data that is captured when the conversion is performed matches the specified data
// (by checking a single field's value)
@ -1546,14 +960,6 @@ public class KafkaConfigBackingStoreTest {
});
}
private void expectConvertRead(final String configKey, final Struct struct, final byte[] serialized) {
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
.andAnswer(() -> new SchemaAndValue(null, serialized == null ? null : structToMap(struct)));
LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
recordsToRead.put(configKey, serialized);
expectReadToEnd(recordsToRead);
}
// This map needs to maintain ordering
private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
EasyMock.expect(storeLog.readToEnd())
@ -1569,16 +975,6 @@ public class KafkaConfigBackingStoreTest {
});
}
private void expectConnectorRemoval(String configKey, String targetStateKey) throws Exception {
expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
recordsToRead.put(configKey, null);
recordsToRead.put(targetStateKey, null);
expectReadToEnd(recordsToRead);
}
private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
final String dataFieldName, final Object dataFieldValue) throws Exception {
expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);