KAFKA-14814: Skip Connect target state updates when the configs store has same state (#13426)

Reviewers: Yash Mayya <yash.mayya@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Chaitanya Mukka 2023-03-23 20:53:38 +05:30 committed by GitHub
parent 45ecae6a28
commit 3c25b311cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 3 deletions

View File

@ -908,6 +908,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private void processTargetStateRecord(String connectorName, SchemaAndValue value) {
boolean removed = false;
boolean stateChanged = true;
synchronized (lock) {
if (value.value() == null) {
// When connector configs are removed, we also write tombstones for the target state.
@ -935,7 +936,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
try {
TargetState state = TargetState.valueOf((String) targetState);
log.debug("Setting target state for connector '{}' to {}", connectorName, targetState);
connectorTargetStates.put(connectorName, state);
TargetState prevState = connectorTargetStates.put(connectorName, state);
stateChanged = !state.equals(prevState);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector '{}': {}", connectorName, targetState);
return;
@ -945,7 +947,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// Note that we do not notify the update listener if the target state has been removed.
// Instead we depend on the removal callback of the connector config itself to notify the worker.
if (started && !removed)
if (started && !removed && stateChanged)
updateListener.onConnectorTargetStateChange(connectorName);
}

View File

@ -147,9 +147,10 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
if (connectorState == null)
throw new IllegalArgumentException("No connector `" + connector + "` configured");
TargetState prevState = connectorState.targetState;
connectorState.targetState = state;
if (updateListener != null)
if (updateListener != null && !state.equals(prevState))
updateListener.onConnectorTargetStateChange(connector);
}

View File

@ -146,6 +146,7 @@ public class KafkaConfigBackingStoreTest {
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
);
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@ -889,6 +890,54 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testSameTargetState() throws Exception {
// verify that we handle target state changes correctly when they come up through the log
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserialized);
// on resume update listener shouldn't be called
configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString());
EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected call to onConnectorTargetStateChange"));
expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
configStorage.refresh(0, TimeUnit.SECONDS);
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testBackgroundConnectorDeletion() throws Exception {
// verify that we handle connector deletions correctly when they come up through the log