KAFKA-14015: Reconfigure tasks if configs have been changed for restarted connectors in standalone mode(#12568)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Yash Mayya 2022-09-06 18:35:21 +05:30 committed by GitHub
parent 0cbf74a940
commit c359558826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 23 deletions

View File

@ -294,7 +294,12 @@ public class StandaloneHerder extends AbstractHerder {
worker.stopAndAwaitConnector(connName); worker.stopAndAwaitConnector(connName);
startConnector(connName, (error, result) -> cb.onCompletion(error, null)); startConnector(connName, (error, targetState) -> {
if (targetState == TargetState.STARTED) {
requestTaskReconfiguration(connName);
}
cb.onCompletion(error, null);
});
} }
@Override @Override

View File

@ -87,6 +87,7 @@ import static java.util.Collections.singletonMap;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.easymock.EasyMock.eq;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
@ -136,7 +137,7 @@ public class StandaloneHerderTest {
PowerMock.mockStatic(Plugins.class); PowerMock.mockStatic(Plugins.class);
PowerMock.mockStatic(WorkerConnector.class); PowerMock.mockStatic(WorkerConnector.class);
Capture<Map<String, String>> configCapture = Capture.newInstance(); Capture<Map<String, String>> configCapture = Capture.newInstance();
EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes(); EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
} }
@Test @Test
@ -276,7 +277,7 @@ public class StandaloneHerderTest {
} }
@Test @Test
public void testRestartConnector() throws Exception { public void testRestartConnectorSameTaskConfigs() throws Exception {
expectAdd(SourceSink.SOURCE); expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE);
@ -287,13 +288,65 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall(); EasyMock.expectLastCall();
Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(config), EasyMock.anyObject(HerderConnectorContext.class), worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), EasyMock.capture(onStart)); eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STARTED); onStart.getValue().onCompletion(null, TargetState.STARTED);
return true; return true;
}); });
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
// same task configs as earlier, so don't expect a new set of tasks to be brought up
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
FutureCallback<Void> restartCallback = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, restartCallback);
restartCallback.get(1000L, TimeUnit.MILLISECONDS);
PowerMock.verifyAll();
}
@Test
public void testRestartConnectorNewTaskConfigs() throws Exception {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
worker.stopAndAwaitConnector(CONNECTOR_NAME);
EasyMock.expectLastCall();
Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class),
eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
});
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
// changed task configs, expect a new set of tasks to be brought up (and the old ones to be stopped)
Map<String, String> taskConfigs = taskConfig(SourceSink.SOURCE);
taskConfigs.put("k", "v");
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))).andReturn(Collections.singletonList(taskConfigs));
worker.stopAndAwaitTasks(Collections.singletonList(taskId));
EasyMock.expectLastCall();
statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
EasyMock.expectLastCall();
worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), EasyMock.anyObject(), eq(connectorConfig(SourceSink.SOURCE)), eq(taskConfigs), eq(herder), eq(TargetState.STARTED));
EasyMock.expectLastCall().andReturn(true);
PowerMock.replayAll(); PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
@ -319,8 +372,8 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall(); EasyMock.expectLastCall();
Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(config), EasyMock.anyObject(HerderConnectorContext.class), worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), EasyMock.capture(onStart)); eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
Exception exception = new ConnectException("Failed to start connector"); Exception exception = new ConnectException("Failed to start connector");
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(exception, null); onStart.getValue().onCompletion(exception, null);
@ -524,8 +577,8 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall(); EasyMock.expectLastCall();
Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig), EasyMock.anyObject(HerderConnectorContext.class), worker.startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), EasyMock.capture(onStart)); eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STARTED); onStart.getValue().onCompletion(null, TargetState.STARTED);
return true; return true;
@ -630,8 +683,8 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall(); EasyMock.expectLastCall();
Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig), EasyMock.anyObject(HerderConnectorContext.class), worker.startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), EasyMock.capture(onStart)); eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STARTED); onStart.getValue().onCompletion(null, TargetState.STARTED);
return true; return true;
@ -746,7 +799,7 @@ public class StandaloneHerderTest {
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
EasyMock.reset(transformer); EasyMock.reset(transformer);
EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), EasyMock.anyObject())) EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.anyObject()))
.andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")) .andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info"))
.anyTimes(); .anyTimes();
EasyMock.replay(transformer); EasyMock.replay(transformer);
@ -782,19 +835,15 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall(); EasyMock.expectLastCall();
Capture<Map<String, String>> capturedConfig = EasyMock.newCapture(); Capture<Map<String, String>> capturedConfig = EasyMock.newCapture();
Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(capturedConfig), EasyMock.anyObject(), worker.startConnector(eq(CONNECTOR_NAME), EasyMock.capture(capturedConfig), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), EasyMock.capture(onStart)); eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STARTED); onStart.getValue().onCompletion(null, TargetState.STARTED);
return true; return true;
}); });
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
EasyMock.expect(worker.isTopicCreationEnabled()).andReturn(true);
// Generate same task config, which should result in no additional action to restart tasks // Generate same task config, which should result in no additional action to restart tasks
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true)))
.andReturn(singletonList(taskConfig(SourceSink.SOURCE))); .andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
worker.isSinkConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(false);
expectConfigValidation(connectorMock, false, newConnConfig); expectConfigValidation(connectorMock, false, newConnConfig);
connectorConfigCb.onCompletion(null, newConnConfig); connectorConfigCb.onCompletion(null, newConnConfig);
@ -888,15 +937,15 @@ public class StandaloneHerderTest {
new SinkConnectorConfig(plugins, connectorProps); new SinkConnectorConfig(plugins, connectorProps);
Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class), worker.startConnector(eq(CONNECTOR_NAME), eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), EasyMock.capture(onStart)); eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STARTED); onStart.getValue().onCompletion(null, TargetState.STARTED);
return true; return true;
}); });
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes();
if (sourceSink == SourceSink.SOURCE) { if (sourceSink == SourceSink.SOURCE) {
EasyMock.expect(worker.isTopicCreationEnabled()).andReturn(true); EasyMock.expect(worker.isTopicCreationEnabled()).andReturn(true).anyTimes();
} }
// And we should instantiate the tasks. For a sink task, we should see added properties for the input topic partitions // And we should instantiate the tasks. For a sink task, we should see added properties for the input topic partitions
@ -930,7 +979,7 @@ public class StandaloneHerderTest {
EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName())) EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName()))
.andReturn(ConnectorType.SINK).anyTimes(); .andReturn(ConnectorType.SINK).anyTimes();
worker.isSinkConnector(CONNECTOR_NAME); worker.isSinkConnector(CONNECTOR_NAME);
PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK); PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK).anyTimes();
} }
private ConnectorInfo createdInfo(SourceSink sourceSink) { private ConnectorInfo createdInfo(SourceSink sourceSink) {