KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (#15933)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-05-18 04:01:27 +08:00 committed by GitHub
parent 22f5113dba
commit 93a5efc4b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 321 additions and 355 deletions

View File

@ -38,17 +38,18 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -64,6 +65,7 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -86,6 +88,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
@ -235,8 +238,7 @@ public class KafkaConfigBackingStoreMockitoTest {
props.put("config.storage.max.message.bytes", "1001"); props.put("config.storage.max.message.bytes", "1001");
createStore(); createStore();
expectStart(Collections.emptyList(), Collections.emptyMap()); when(configLog.partitionCount()).thenReturn(1);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
@ -265,8 +267,8 @@ public class KafkaConfigBackingStoreMockitoTest {
props.put("config.storage.min.insync.replicas", "3"); props.put("config.storage.min.insync.replicas", "3");
props.put("config.storage.max.message.bytes", "1001"); props.put("config.storage.max.message.bytes", "1001");
createStore(); createStore();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1); when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -285,8 +287,7 @@ public class KafkaConfigBackingStoreMockitoTest {
@Test @Test
public void testPutConnectorConfig() throws Exception { public void testPutConnectorConfig() throws Exception {
expectStart(Collections.emptyList(), Collections.emptyMap()); when(configLog.partitionCount()).thenReturn(1);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -337,8 +338,10 @@ public class KafkaConfigBackingStoreMockitoTest {
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1)); verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
// Config deletion // Config deletion
expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null); when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null);
expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null); when(converter.toConnectData(TOPIC, null)).thenReturn(new SchemaAndValue(null, null));
when(configLog.sendWithReceipt(AdditionalMatchers.or(Mockito.eq(configKey), Mockito.eq(targetStateKey)),
Mockito.isNull())).thenReturn(producerFuture);
// Deletion should remove the second one we added // Deletion should remove the second one we added
configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1)); configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
@ -356,8 +359,7 @@ public class KafkaConfigBackingStoreMockitoTest {
@Test @Test
public void testPutConnectorConfigWithTargetState() throws Exception { public void testPutConnectorConfigWithTargetState() throws Exception {
expectStart(Collections.emptyList(), Collections.emptyMap()); when(configLog.partitionCount()).thenReturn(1);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -402,8 +404,7 @@ public class KafkaConfigBackingStoreMockitoTest {
@Test @Test
public void testPutConnectorConfigProducerError() throws Exception { public void testPutConnectorConfigProducerError() throws Exception {
expectStart(Collections.emptyList(), Collections.emptyMap()); when(configLog.partitionCount()).thenReturn(1);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -433,8 +434,7 @@ public class KafkaConfigBackingStoreMockitoTest {
@Test @Test
public void testRemoveConnectorConfigSlowProducer() throws Exception { public void testRemoveConnectorConfigSlowProducer() throws Exception {
expectStart(Collections.emptyList(), Collections.emptyMap()); when(configLog.partitionCount()).thenReturn(1);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -486,8 +486,7 @@ public class KafkaConfigBackingStoreMockitoTest {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
createStore(); createStore();
expectStart(Collections.emptyList(), Collections.emptyMap()); when(configLog.partitionCount()).thenReturn(1);
expectPartitionCount(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -507,7 +506,8 @@ public class KafkaConfigBackingStoreMockitoTest {
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))) doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0))))
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)))) .doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2))))
.when(configLog).readToEnd(); .when(configLog).readToEnd();
expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)); when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0)))
.thenReturn(new SchemaAndValue(null, structToMap(CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))));
// Should succeed now // Should succeed now
configStorage.claimWritePrivileges(); configStorage.claimWritePrivileges();
@ -583,7 +583,7 @@ public class KafkaConfigBackingStoreMockitoTest {
logOffset = 5; logOffset = 5;
expectStart(existingRecords, deserialized); expectStart(existingRecords, deserialized);
expectPartitionCount(1); when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -592,7 +592,7 @@ public class KafkaConfigBackingStoreMockitoTest {
// The target state deletion should reset the state to STARTED // The target state deletion should reset the state to STARTED
ClusterConfigState configState = configStorage.snapshot(); ClusterConfigState configState = configStorage.snapshot();
assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
configStorage.stop(); configStorage.stop();
@ -627,7 +627,7 @@ public class KafkaConfigBackingStoreMockitoTest {
expectStart(existingRecords, deserialized); expectStart(existingRecords, deserialized);
// Shouldn't see any callbacks since this is during startup // Shouldn't see any callbacks since this is during startup
expectPartitionCount(1); when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -636,7 +636,7 @@ public class KafkaConfigBackingStoreMockitoTest {
// Should see a single connector with initial state paused // Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot(); ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0))); assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1))); assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
@ -683,7 +683,7 @@ public class KafkaConfigBackingStoreMockitoTest {
logOffset = 9; logOffset = 9;
expectStart(existingRecords, deserialized); expectStart(existingRecords, deserialized);
expectPartitionCount(1); when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -692,7 +692,7 @@ public class KafkaConfigBackingStoreMockitoTest {
// Should see a single connector and its config should be the last one seen anywhere in the log // Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot(); ClusterConfigState configState = configStorage.snapshot();
assertEquals(logOffset, configState.offset()); // Should always be next to be read, even if uncommitted assertEquals(logOffset, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
// CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
@ -740,7 +740,7 @@ public class KafkaConfigBackingStoreMockitoTest {
logOffset = 6; logOffset = 6;
expectStart(existingRecords, deserialized); expectStart(existingRecords, deserialized);
expectPartitionCount(1); when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -791,7 +791,7 @@ public class KafkaConfigBackingStoreMockitoTest {
deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR); deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
logOffset = 8; logOffset = 8;
expectStart(existingRecords, deserialized); expectStart(existingRecords, deserialized);
expectPartitionCount(1); when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure(); verifyConfigure();
@ -897,8 +897,7 @@ public class KafkaConfigBackingStoreMockitoTest {
@Test @Test
public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() { public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() {
expectStart(Collections.emptyList(), Collections.emptyMap()); when(configLog.partitionCount()).thenReturn(2);
expectPartitionCount(2);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start()); ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
@ -961,6 +960,291 @@ public class KafkaConfigBackingStoreMockitoTest {
assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)); assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG));
} }
@Test
public void testBackgroundConnectorDeletion() throws Exception {
// verify that we handle connector deletions correctly when they come up through the log
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserialized);
when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
verify(configLog).start();
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedData.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1));
doAnswer(expectReadToEnd(serializedData)).when(configLog).readToEnd();
Map<String, Struct> deserializedData = new HashMap<>();
deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null);
deserializedData.put(TARGET_STATE_KEYS.get(0), null);
expectRead(serializedData, deserializedData);
configStorage.refresh(0, TimeUnit.SECONDS);
verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(0));
configState = configStorage.snapshot();
// Connector should now be removed from the snapshot
assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
// Ensure that the deleted connector's deferred task updates have been cleaned up
// in order to prevent unbounded growth of the map
assertEquals(Collections.emptyMap(), configStorage.deferredTaskUpdates);
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
// Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
// We start out by loading an initial configuration where we started to write a task update, and then
// compaction cleaned up the earlier record.
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
// This is the record that has been compacted:
//new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
logOffset = 6;
expectStart(existingRecords, deserialized);
when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
// After reading the log, it should have been in an inconsistent state
ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
// Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1)));
assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors());
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
// Successful attempt to write new task config
doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(serializedConfigs))
.when(configLog).readToEnd();
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
"tasks", 1); // Updated to just 1 task
// Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
// we are going to shrink the number of tasks to 1
configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0)));
// Validate updated config
configState = configStorage.snapshot();
// This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
// to the topic. Only the last call with 1 task config + 1 commit actually gets written.
assertEquals(8, configState.offset());
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
verify(configUpdateListener).onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(0)));
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testPutRestartRequestOnlyFailed() throws Exception {
RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, false);
testPutRestartRequest(restartRequest);
}
@Test
public void testPutRestartRequestOnlyFailedIncludingTasks() throws Exception {
RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, true);
testPutRestartRequest(restartRequest);
}
private void testPutRestartRequest(RestartRequest restartRequest) throws Exception {
expectStart(Collections.emptyList(), Collections.emptyMap());
when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
verify(configLog).start();
expectConvertWriteRead(
RESTART_CONNECTOR_KEYS.get(0), KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0),
ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
recordsToRead.put(RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
doAnswer(expectReadToEnd(recordsToRead)).when(configLog).readToEnd();
// Writing should block until it is written and read back from Kafka
configStorage.putRestartRequest(restartRequest);
final ArgumentCaptor<RestartRequest> restartRequestCaptor = ArgumentCaptor.forClass(RestartRequest.class);
verify(configUpdateListener).onRestartRequest(restartRequestCaptor.capture());
assertEquals(restartRequest.connectorName(), restartRequestCaptor.getValue().connectorName());
assertEquals(restartRequest.onlyFailed(), restartRequestCaptor.getValue().onlyFailed());
assertEquals(restartRequest.includeTasks(), restartRequestCaptor.getValue().includeTasks());
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testRestoreRestartRequestInconsistentState() {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state doesn't prevent startup.
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), RESTART_REQUEST_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), RESTART_REQUEST_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(2), RESTART_REQUEST_STRUCTS.get(2));
deserialized.put(CONFIGS_SERIALIZED.get(3), null);
logOffset = 4;
expectStart(existingRecords, deserialized);
when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
verify(configLog).start();
// Shouldn't see any callbacks since this is during startup
verify(configUpdateListener, never()).onConnectorConfigRemove(anyString());
verify(configUpdateListener, never()).onConnectorConfigUpdate(anyString());
verify(configUpdateListener, never()).onTaskConfigUpdate(anyCollection());
verify(configUpdateListener, never()).onConnectorTargetStateChange(anyString());
verify(configUpdateListener, never()).onSessionKeyUpdate(any(SessionKey.class));
verify(configUpdateListener, never()).onRestartRequest(any(RestartRequest.class));
verify(configUpdateListener, never()).onLoggingLevelUpdate(anyString(), anyString());
configStorage.stop();
verify(configLog).stop();
}
@Test
public void testPutLogLevel() throws Exception {
final String logger1 = "org.apache.zookeeper";
final String logger2 = "org.apache.cassandra";
final String logger3 = "org.apache.kafka.clients";
final String logger4 = "org.apache.kafka.connect";
final String level1 = "ERROR";
final String level3 = "WARN";
final String level4 = "DEBUG";
final Struct existingLogLevel = new Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0)
.put("level", level1);
// Pre-populate the config topic with a couple of logger level records; these should be ignored (i.e.,
// not reported to the update listener)
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger1,
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()
),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger2,
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()
)
);
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), existingLogLevel);
// Make sure we gracefully handle tombstones
deserialized.put(CONFIGS_SERIALIZED.get(1), null);
logOffset = 2;
expectStart(existingRecords, deserialized);
when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
verify(configLog).start();
expectConvertWriteRead(
"logger-cluster-" + logger3, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2),
"level", level3);
configStorage.putLoggerLevel(logger3, level3);
expectConvertWriteRead(
"logger-cluster-" + logger4, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3),
"level", level4);
configStorage.putLoggerLevel(logger4, level4);
LinkedHashMap<String, byte[]> newRecords = new LinkedHashMap<>();
newRecords.put("logger-cluster-" + logger3, CONFIGS_SERIALIZED.get(2));
newRecords.put("logger-cluster-" + logger4, CONFIGS_SERIALIZED.get(3));
doAnswer(expectReadToEnd(newRecords)).when(configLog).readToEnd();
configStorage.refresh(0, TimeUnit.SECONDS);
verify(configUpdateListener).onLoggingLevelUpdate(logger3, level3);
verify(configUpdateListener).onLoggingLevelUpdate(logger4, level4);
configStorage.stop();
verify(configLog).stop();
}
private void verifyConfigure() { private void verifyConfigure() {
verify(configStorage).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(), verify(configStorage).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(),
capturedConsumerProps.capture(), capturedConsumedCallback.capture(), capturedConsumerProps.capture(), capturedConsumedCallback.capture(),
@ -984,36 +1268,22 @@ public class KafkaConfigBackingStoreMockitoTest {
} }
} }
private void expectPartitionCount(int partitionCount) {
when(configLog.partitionCount()).thenReturn(partitionCount);
}
// Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
// from the log. Validate the data that is captured when the conversion is performed matches the specified data // from the log. Validate the data that is captured when the conversion is performed matches the specified data
// (by checking a single field's value) // (by checking a single field's value)
private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized, private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
final String dataFieldName, final Object dataFieldValue) throws Exception { final String dataFieldName, final Object dataFieldValue) throws Exception {
final ArgumentCaptor<Struct> capturedRecord = ArgumentCaptor.forClass(Struct.class); final ArgumentCaptor<Struct> capturedRecord = ArgumentCaptor.forClass(Struct.class);
if (serialized != null) when(converter.fromConnectData(eq(TOPIC), eq(valueSchema), capturedRecord.capture())).thenReturn(serialized);
when(converter.fromConnectData(eq(TOPIC), eq(valueSchema), capturedRecord.capture()))
.thenReturn(serialized);
when(configLog.sendWithReceipt(configKey, serialized)).thenReturn(producerFuture); when(configLog.sendWithReceipt(configKey, serialized)).thenReturn(producerFuture);
when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null); when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null);
when(converter.toConnectData(TOPIC, serialized)).thenAnswer((Answer<SchemaAndValue>) invocation -> { when(converter.toConnectData(TOPIC, serialized)).thenAnswer(invocation -> {
if (dataFieldName != null)
assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
// Note null schema because default settings for internal serialization are schema-less // Note null schema because default settings for internal serialization are schema-less
return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue())); return new SchemaAndValue(null, structToMap(capturedRecord.getValue()));
}); });
} }
private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) {
LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
serializedData.put(key, serializedValue);
expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
}
private void expectRead(LinkedHashMap<String, byte[]> serializedValues, private void expectRead(LinkedHashMap<String, byte[]> serializedValues,
Map<String, Struct> deserializedValues) { Map<String, Struct> deserializedValues) {
for (Map.Entry<String, Struct> deserializedValueEntry : deserializedValues.entrySet()) { for (Map.Entry<String, Struct> deserializedValueEntry : deserializedValues.entrySet()) {
@ -1026,14 +1296,14 @@ public class KafkaConfigBackingStoreMockitoTest {
// This map needs to maintain ordering // This map needs to maintain ordering
private Answer<Future<Void>> expectReadToEnd(final Map<String, byte[]> serializedConfigs) { private Answer<Future<Void>> expectReadToEnd(final Map<String, byte[]> serializedConfigs) {
return invocation -> { return invocation -> {
TestFuture<Void> future = new TestFuture<>();
for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet()) { for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet()) {
capturedConsumedCallback.getValue().onCompletion(null, capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0, 0, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0, 0,
entry.getKey(), entry.getValue(), new RecordHeaders(), Optional.empty())); entry.getKey(), entry.getValue(), new RecordHeaders(), Optional.empty()));
} }
future.resolveOnGet((Void) null); CompletableFuture<Void> f = new CompletableFuture<>();
return future; f.complete(null);
return f;
}; };
} }
@ -1041,10 +1311,8 @@ public class KafkaConfigBackingStoreMockitoTest {
private Map<String, Object> structToMap(Struct struct) { private Map<String, Object> structToMap(Struct struct) {
if (struct == null) if (struct == null)
return null; return null;
Map<String, Object> result = new HashMap<>();
HashMap<String, Object> result = new HashMap<>(); for (Field field : struct.schema().fields()) result.put(field.name(), struct.get(field));
for (Field field : struct.schema().fields())
result.put(field.name(), struct.get(field));
return result; return result;
} }
} }

View File

@ -28,7 +28,6 @@ import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@ -591,299 +590,6 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@Test
public void testBackgroundConnectorDeletion() throws Exception {
// verify that we handle connector deletions correctly when they come up through the log
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserialized);
LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedData.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1));
Map<String, Struct> deserializedData = new HashMap<>();
deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null);
deserializedData.put(TARGET_STATE_KEYS.get(0), null);
expectRead(serializedData, deserializedData);
configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
EasyMock.expectLastCall();
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
configStorage.refresh(0, TimeUnit.SECONDS);
configState = configStorage.snapshot();
// Connector should now be removed from the snapshot
assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
// Ensure that the deleted connector's deferred task updates have been cleaned up
// in order to prevent unbounded growth of the map
assertEquals(Collections.emptyMap(), configStorage.deferredTaskUpdates);
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
// Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
// We start out by loading an initial configuration where we started to write a task update, and then
// compaction cleaned up the earlier record.
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
// This is the record that has been compacted:
//new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
logOffset = 6;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Successful attempt to write new task config
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
"tasks", 1); // Updated to just 1 task
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0)));
EasyMock.expectLastCall();
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
expectReadToEnd(serializedConfigs);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// After reading the log, it should have been in an inconsistent state
ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
// Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1)));
assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors());
// Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
// we are going to shrink the number of tasks to 1
configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0)));
// Validate updated config
configState = configStorage.snapshot();
// This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
// to the topic. Only the last call with 1 task config + 1 commit actually gets written.
assertEquals(8, configState.offset());
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutRestartRequestOnlyFailed() throws Exception {
RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, false);
testPutRestartRequest(restartRequest);
}
@Test
public void testPutRestartRequestOnlyFailedIncludingTasks() throws Exception {
RestartRequest restartRequest = new RestartRequest(CONNECTOR_IDS.get(0), true, true);
testPutRestartRequest(restartRequest);
}
private void testPutRestartRequest(RestartRequest restartRequest) throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectConvertWriteAndRead(
RESTART_CONNECTOR_KEYS.get(0), KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0),
ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
final Capture<RestartRequest> capturedRestartRequest = EasyMock.newCapture();
configUpdateListener.onRestartRequest(EasyMock.capture(capturedRestartRequest));
EasyMock.expectLastCall();
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Writing should block until it is written and read back from Kafka
configStorage.putRestartRequest(restartRequest);
assertEquals(restartRequest.connectorName(), capturedRestartRequest.getValue().connectorName());
assertEquals(restartRequest.onlyFailed(), capturedRestartRequest.getValue().onlyFailed());
assertEquals(restartRequest.includeTasks(), capturedRestartRequest.getValue().includeTasks());
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testRestoreRestartRequestInconsistentState() throws Exception {
// Restoring data should notify only of the latest values after loading is complete. This also validates
// that inconsistent state doesnt prevent startup.
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), RESTART_REQUEST_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), RESTART_REQUEST_STRUCTS.get(1));
deserialized.put(CONFIGS_SERIALIZED.get(2), RESTART_REQUEST_STRUCTS.get(2));
deserialized.put(CONFIGS_SERIALIZED.get(3), null);
logOffset = 4;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutLogLevel() throws Exception {
final String logger1 = "org.apache.zookeeper";
final String logger2 = "org.apache.cassandra";
final String logger3 = "org.apache.kafka.clients";
final String logger4 = "org.apache.kafka.connect";
final String level1 = "ERROR";
final String level3 = "WARN";
final String level4 = "DEBUG";
final Struct existingLogLevel = new Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0)
.put("level", level1);
// Pre-populate the config topic with a couple of logger level records; these should be ignored (i.e.,
// not reported to the update listener)
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger1,
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()
),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger2,
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()
)
);
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), existingLogLevel);
// Make sure we gracefully handle tombstones
deserialized.put(CONFIGS_SERIALIZED.get(1), null);
logOffset = 2;
expectConfigure();
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
expectStop();
expectConvertWriteRead(
"logger-cluster-" + logger3, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2),
"level", level3);
expectConvertWriteRead(
"logger-cluster-" + logger4, KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3),
"level", level4);
LinkedHashMap<String, byte[]> newRecords = new LinkedHashMap<>();
newRecords.put("logger-cluster-" + logger3, CONFIGS_SERIALIZED.get(2));
newRecords.put("logger-cluster-" + logger4, CONFIGS_SERIALIZED.get(3));
expectReadToEnd(newRecords);
configUpdateListener.onLoggingLevelUpdate(logger3, level3);
EasyMock.expectLastCall();
configUpdateListener.onLoggingLevelUpdate(logger4, level4);
EasyMock.expectLastCall();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
configStorage.putLoggerLevel(logger3, level3);
configStorage.putLoggerLevel(logger4, level4);
configStorage.refresh(0, TimeUnit.SECONDS);
configStorage.stop();
PowerMock.verifyAll();
}
private void expectConfigure() throws Exception { private void expectConfigure() throws Exception {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
@ -975,14 +681,6 @@ public class KafkaConfigBackingStoreTest {
}); });
} }
private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
final String dataFieldName, final Object dataFieldValue) throws Exception {
expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
recordsToRead.put(configKey, serialized);
expectReadToEnd(recordsToRead);
}
// Manually insert a connector into config storage, updating the task configs, connector config, and root config // Manually insert a connector into config storage, updating the task configs, connector config, and root config
private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) { private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs"); Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");