mirror of https://github.com/apache/kafka.git
KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (#16164)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
2c82ecd67f
commit
9eb05fc729
|
@ -61,6 +61,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -159,7 +160,7 @@ public class KafkaConfigBackingStoreMockitoTest {
|
||||||
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
|
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
|
||||||
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
|
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
|
||||||
);
|
);
|
||||||
|
private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
|
||||||
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
|
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
|
||||||
.put("state", "PAUSED")
|
.put("state", "PAUSED")
|
||||||
.put("state.v2", "PAUSED");
|
.put("state.v2", "PAUSED");
|
||||||
|
@ -1184,6 +1185,147 @@ public class KafkaConfigBackingStoreMockitoTest {
|
||||||
verify(configLog).stop();
|
verify(configLog).stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutTaskConfigsZeroTasks() throws Exception {
|
||||||
|
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
|
||||||
|
verifyConfigure();
|
||||||
|
configStorage.start();
|
||||||
|
verify(configLog).start();
|
||||||
|
|
||||||
|
// Records to be read by consumer as it reads to the end of the log
|
||||||
|
doAnswer(expectReadToEnd(new LinkedHashMap<>())).
|
||||||
|
doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0))))
|
||||||
|
.when(configLog).readToEnd();
|
||||||
|
|
||||||
|
expectConvertWriteRead(
|
||||||
|
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
|
||||||
|
"tasks", 0); // We have 0 tasks
|
||||||
|
|
||||||
|
// Bootstrap as if we had already added the connector, but no tasks had been added yet
|
||||||
|
addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
|
||||||
|
|
||||||
|
|
||||||
|
// Null before writing
|
||||||
|
ClusterConfigState configState = configStorage.snapshot();
|
||||||
|
assertEquals(-1, configState.offset());
|
||||||
|
|
||||||
|
// Writing task configs should block until all the writes have been performed and the root record update
|
||||||
|
// has completed
|
||||||
|
List<Map<String, String>> taskConfigs = Collections.emptyList();
|
||||||
|
configStorage.putTaskConfigs("connector1", taskConfigs);
|
||||||
|
|
||||||
|
// Validate root config by listing all connectors and tasks
|
||||||
|
configState = configStorage.snapshot();
|
||||||
|
assertEquals(1, configState.offset());
|
||||||
|
String connectorName = CONNECTOR_IDS.get(0);
|
||||||
|
assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors()));
|
||||||
|
assertEquals(Collections.emptyList(), configState.tasks(connectorName));
|
||||||
|
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
|
||||||
|
|
||||||
|
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
|
||||||
|
verify(configUpdateListener).onTaskConfigUpdate(Collections.emptyList());
|
||||||
|
|
||||||
|
configStorage.stop();
|
||||||
|
verify(configLog).stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBackgroundUpdateTargetState() throws Exception {
|
||||||
|
// verify that we handle target state changes correctly when they come up through the log
|
||||||
|
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
|
||||||
|
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
|
||||||
|
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
|
||||||
|
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
|
||||||
|
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
|
||||||
|
LinkedHashMap<byte[], Struct> deserializedOnStartup = new LinkedHashMap<>();
|
||||||
|
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
|
||||||
|
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
|
||||||
|
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
|
||||||
|
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
|
||||||
|
logOffset = 5;
|
||||||
|
|
||||||
|
expectStart(existingRecords, deserializedOnStartup);
|
||||||
|
when(configLog.partitionCount()).thenReturn(1);
|
||||||
|
|
||||||
|
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
|
||||||
|
verifyConfigure();
|
||||||
|
configStorage.start();
|
||||||
|
verify(configLog).start();
|
||||||
|
|
||||||
|
// Should see a single connector with initial state started
|
||||||
|
ClusterConfigState configState = configStorage.snapshot();
|
||||||
|
assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configStorage.connectorTargetStates.keySet());
|
||||||
|
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
|
||||||
|
|
||||||
|
LinkedHashMap<String, byte[]> serializedAfterStartup = new LinkedHashMap<>();
|
||||||
|
serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
|
||||||
|
serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
|
||||||
|
doAnswer(expectReadToEnd(serializedAfterStartup)).when(configLog).readToEnd();
|
||||||
|
|
||||||
|
Map<String, Struct> deserializedAfterStartup = new HashMap<>();
|
||||||
|
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED);
|
||||||
|
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED);
|
||||||
|
expectRead(serializedAfterStartup, deserializedAfterStartup);
|
||||||
|
|
||||||
|
// Should see two connectors now, one paused and one stopped
|
||||||
|
configStorage.refresh(0, TimeUnit.SECONDS);
|
||||||
|
verify(configUpdateListener).onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
|
||||||
|
configState = configStorage.snapshot();
|
||||||
|
|
||||||
|
assertEquals(new HashSet<>(CONNECTOR_IDS), configStorage.connectorTargetStates.keySet());
|
||||||
|
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
|
||||||
|
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
|
||||||
|
|
||||||
|
configStorage.stop();
|
||||||
|
verify(configStorage).stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSameTargetState() throws Exception {
|
||||||
|
// verify that we handle target state changes correctly when they come up through the log
|
||||||
|
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
|
||||||
|
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
|
||||||
|
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
|
||||||
|
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
|
||||||
|
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
|
||||||
|
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
|
||||||
|
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
|
||||||
|
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
|
||||||
|
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
|
||||||
|
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
|
||||||
|
deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
|
||||||
|
logOffset = 5;
|
||||||
|
|
||||||
|
expectStart(existingRecords, deserialized);
|
||||||
|
|
||||||
|
when(configLog.partitionCount()).thenReturn(1);
|
||||||
|
|
||||||
|
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
|
||||||
|
verifyConfigure();
|
||||||
|
configStorage.start();
|
||||||
|
verify(configLog).start();
|
||||||
|
|
||||||
|
ClusterConfigState configState = configStorage.snapshot();
|
||||||
|
expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED);
|
||||||
|
// Should see a single connector with initial state paused
|
||||||
|
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
|
||||||
|
|
||||||
|
expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED);
|
||||||
|
// on resume update listener shouldn't be called
|
||||||
|
verify(configUpdateListener, never()).onConnectorTargetStateChange(anyString());
|
||||||
|
|
||||||
|
configStorage.stop();
|
||||||
|
verify(configStorage).stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutLogLevel() throws Exception {
|
public void testPutLogLevel() throws Exception {
|
||||||
final String logger1 = "org.apache.zookeeper";
|
final String logger1 = "org.apache.zookeeper";
|
||||||
|
@ -1293,6 +1435,12 @@ public class KafkaConfigBackingStoreMockitoTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) {
|
||||||
|
LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
|
||||||
|
serializedData.put(key, serializedValue);
|
||||||
|
expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
|
||||||
|
}
|
||||||
|
|
||||||
// This map needs to maintain ordering
|
// This map needs to maintain ordering
|
||||||
private Answer<Future<Void>> expectReadToEnd(final Map<String, byte[]> serializedConfigs) {
|
private Answer<Future<Void>> expectReadToEnd(final Map<String, byte[]> serializedConfigs) {
|
||||||
return invocation -> {
|
return invocation -> {
|
||||||
|
@ -1315,4 +1463,11 @@ public class KafkaConfigBackingStoreMockitoTest {
|
||||||
for (Field field : struct.schema().fields()) result.put(field.name(), struct.get(field));
|
for (Field field : struct.schema().fields()) result.put(field.name(), struct.get(field));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
|
||||||
|
for (int i = 0; i < taskConfigs.size(); i++)
|
||||||
|
configStorage.taskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
|
||||||
|
configStorage.connectorConfigs.put(connectorName, connectorConfig);
|
||||||
|
configStorage.connectorTaskCounts.put(connectorName, taskConfigs.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.kafka.connect.data.Field;
|
||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||||
import org.apache.kafka.connect.data.Struct;
|
import org.apache.kafka.connect.data.Struct;
|
||||||
import org.apache.kafka.connect.runtime.TargetState;
|
|
||||||
import org.apache.kafka.connect.runtime.WorkerConfig;
|
import org.apache.kafka.connect.runtime.WorkerConfig;
|
||||||
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
|
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
|
||||||
import org.apache.kafka.connect.util.Callback;
|
import org.apache.kafka.connect.util.Callback;
|
||||||
|
@ -52,13 +51,11 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
|
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
|
||||||
|
@ -430,167 +427,6 @@ public class KafkaConfigBackingStoreTest {
|
||||||
PowerMock.verifyAll();
|
PowerMock.verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testPutTaskConfigsZeroTasks() throws Exception {
|
|
||||||
expectConfigure();
|
|
||||||
expectStart(Collections.emptyList(), Collections.emptyMap());
|
|
||||||
|
|
||||||
// Task configs should read to end, write to the log, read to end, write root.
|
|
||||||
expectReadToEnd(new LinkedHashMap<>());
|
|
||||||
expectConvertWriteRead(
|
|
||||||
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
|
|
||||||
"tasks", 0); // We have 0 tasks
|
|
||||||
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
|
|
||||||
configUpdateListener.onTaskConfigUpdate(Collections.emptyList());
|
|
||||||
EasyMock.expectLastCall();
|
|
||||||
|
|
||||||
// Records to be read by consumer as it reads to the end of the log
|
|
||||||
LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
|
|
||||||
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
|
|
||||||
expectReadToEnd(serializedConfigs);
|
|
||||||
|
|
||||||
expectPartitionCount(1);
|
|
||||||
expectStop();
|
|
||||||
|
|
||||||
PowerMock.replayAll();
|
|
||||||
|
|
||||||
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
|
|
||||||
configStorage.start();
|
|
||||||
|
|
||||||
// Bootstrap as if we had already added the connector, but no tasks had been added yet
|
|
||||||
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
|
|
||||||
|
|
||||||
// Null before writing
|
|
||||||
ClusterConfigState configState = configStorage.snapshot();
|
|
||||||
assertEquals(-1, configState.offset());
|
|
||||||
|
|
||||||
// Writing task configs should block until all the writes have been performed and the root record update
|
|
||||||
// has completed
|
|
||||||
List<Map<String, String>> taskConfigs = Collections.emptyList();
|
|
||||||
configStorage.putTaskConfigs("connector1", taskConfigs);
|
|
||||||
|
|
||||||
// Validate root config by listing all connectors and tasks
|
|
||||||
configState = configStorage.snapshot();
|
|
||||||
assertEquals(1, configState.offset());
|
|
||||||
String connectorName = CONNECTOR_IDS.get(0);
|
|
||||||
assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors()));
|
|
||||||
assertEquals(Collections.emptyList(), configState.tasks(connectorName));
|
|
||||||
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
|
|
||||||
|
|
||||||
configStorage.stop();
|
|
||||||
|
|
||||||
PowerMock.verifyAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBackgroundUpdateTargetState() throws Exception {
|
|
||||||
// verify that we handle target state changes correctly when they come up through the log
|
|
||||||
|
|
||||||
expectConfigure();
|
|
||||||
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
|
|
||||||
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
|
|
||||||
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
|
|
||||||
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
|
|
||||||
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
|
|
||||||
LinkedHashMap<byte[], Struct> deserializedOnStartup = new LinkedHashMap<>();
|
|
||||||
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
|
|
||||||
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
|
|
||||||
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
|
|
||||||
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
|
|
||||||
logOffset = 5;
|
|
||||||
|
|
||||||
expectStart(existingRecords, deserializedOnStartup);
|
|
||||||
|
|
||||||
LinkedHashMap<String, byte[]> serializedAfterStartup = new LinkedHashMap<>();
|
|
||||||
serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
|
|
||||||
serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
|
|
||||||
|
|
||||||
Map<String, Struct> deserializedAfterStartup = new HashMap<>();
|
|
||||||
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED);
|
|
||||||
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED);
|
|
||||||
|
|
||||||
expectRead(serializedAfterStartup, deserializedAfterStartup);
|
|
||||||
|
|
||||||
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
|
|
||||||
EasyMock.expectLastCall();
|
|
||||||
|
|
||||||
expectPartitionCount(1);
|
|
||||||
expectStop();
|
|
||||||
|
|
||||||
PowerMock.replayAll();
|
|
||||||
|
|
||||||
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
|
|
||||||
configStorage.start();
|
|
||||||
|
|
||||||
// Should see a single connector with initial state started
|
|
||||||
ClusterConfigState configState = configStorage.snapshot();
|
|
||||||
assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configStorage.connectorTargetStates.keySet());
|
|
||||||
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
|
|
||||||
|
|
||||||
// Should see two connectors now, one paused and one stopped
|
|
||||||
configStorage.refresh(0, TimeUnit.SECONDS);
|
|
||||||
configState = configStorage.snapshot();
|
|
||||||
assertEquals(new HashSet<>(CONNECTOR_IDS), configStorage.connectorTargetStates.keySet());
|
|
||||||
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
|
|
||||||
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
|
|
||||||
|
|
||||||
configStorage.stop();
|
|
||||||
|
|
||||||
PowerMock.verifyAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSameTargetState() throws Exception {
|
|
||||||
// verify that we handle target state changes correctly when they come up through the log
|
|
||||||
|
|
||||||
expectConfigure();
|
|
||||||
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
|
|
||||||
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
|
|
||||||
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
|
|
||||||
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
|
|
||||||
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
|
|
||||||
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
|
|
||||||
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
|
|
||||||
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
|
|
||||||
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
|
|
||||||
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
|
|
||||||
deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
|
|
||||||
logOffset = 5;
|
|
||||||
|
|
||||||
expectStart(existingRecords, deserialized);
|
|
||||||
|
|
||||||
// on resume update listener shouldn't be called
|
|
||||||
configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString());
|
|
||||||
EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected call to onConnectorTargetStateChange"));
|
|
||||||
|
|
||||||
expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED);
|
|
||||||
|
|
||||||
expectPartitionCount(1);
|
|
||||||
expectStop();
|
|
||||||
|
|
||||||
PowerMock.replayAll();
|
|
||||||
|
|
||||||
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
|
|
||||||
configStorage.start();
|
|
||||||
|
|
||||||
// Should see a single connector with initial state paused
|
|
||||||
ClusterConfigState configState = configStorage.snapshot();
|
|
||||||
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
|
|
||||||
|
|
||||||
configStorage.refresh(0, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
configStorage.stop();
|
|
||||||
|
|
||||||
PowerMock.verifyAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void expectConfigure() throws Exception {
|
private void expectConfigure() throws Exception {
|
||||||
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
|
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
|
||||||
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
|
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
|
||||||
|
@ -636,12 +472,6 @@ public class KafkaConfigBackingStoreTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) {
|
|
||||||
LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
|
|
||||||
serializedData.put(key, serializedValue);
|
|
||||||
expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
|
// Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
|
||||||
// from the log. Validate the data that is captured when the conversion is performed matches the specified data
|
// from the log. Validate the data that is captured when the conversion is performed matches the specified data
|
||||||
// (by checking a single field's value)
|
// (by checking a single field's value)
|
||||||
|
|
Loading…
Reference in New Issue