diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 174b032ea24..2a27103079a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1185,5 +1185,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return loggers.setLevel(namespace, level); } - } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index a8a6e7858d8..f33fa6fac46 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -121,6 +121,14 @@ public interface Herder { void putConnectorConfig(String connName, Map config, TargetState targetState, boolean allowReplace, Callback> callback); + /** + * Patch the configuration for a connector. + * @param connName name of the connector + * @param configPatch the connector's configuration patch. + * @param callback callback to invoke when the configuration has been written + */ + void patchConnectorConfig(String connName, Map configPatch, Callback> callback); + /** * Delete a connector and its configuration. * @param connName name of the connector diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 22a9640d4ef..f3f2ae7e934 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1096,54 +1096,82 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.trace("Submitting connector config write request {}", connName); addRequest( () -> { - validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> { - if (error != null) { - callback.onCompletion(error, null); - return; - } - - // Complete the connector config write via another herder request in order to - // perform the write to the backing store (or forward to the leader) during - // the "external request" portion of the tick loop - addRequest( - () -> { - if (maybeAddConfigErrors(configInfos, callback)) { - return null; - } - - log.trace("Handling connector config request {}", connName); - if (!isLeader()) { - callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null); - return null; - } - boolean exists = configState.contains(connName); - if (!allowReplace && exists) { - callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); - return null; - } - - log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); - writeToConfigTopicAsLeader( - "writing a config for connector " + connName + " to the config topic", - () -> configBackingStore.putConnectorConfig(connName, config, targetState) - ); - - // Note that we use the updated connector config despite the fact that we don't have an updated - // snapshot yet. The existing task info should still be accurate. - ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName), - connectorType(config)); - callback.onCompletion(null, new Created<>(!exists, info)); - return null; - }, - forwardErrorAndTickThreadStages(callback) - ); - })); + doPutConnectorConfig(connName, config, targetState, allowReplace, callback); return null; }, forwardErrorAndTickThreadStages(callback) ); } + @Override + public void patchConnectorConfig(String connName, Map configPatch, Callback> callback) { + log.trace("Submitting connector config patch request {}", connName); + addRequest(() -> { + // This reduces (but not completely eliminates) the chance for race conditions. + if (!isLeader()) { + callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null); + return null; + } + + ConnectorInfo connectorInfo = connectorInfo(connName); + if (connectorInfo == null) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + } else { + Map patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch); + doPutConnectorConfig(connName, patchedConfig, null, true, callback); + } + return null; + }, forwardErrorAndTickThreadStages(callback)); + } + + private void doPutConnectorConfig( + String connName, + Map config, + TargetState targetState, boolean allowReplace, + Callback> callback) { + validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> { + if (error != null) { + callback.onCompletion(error, null); + return; + } + + // Complete the connector config write via another herder request in order to + // perform the write to the backing store (or forward to the leader) during + // the "external request" portion of the tick loop + addRequest( + () -> { + if (maybeAddConfigErrors(configInfos, callback)) { + return null; + } + + log.trace("Handling connector config request {}", connName); + if (!isLeader()) { + callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null); + return null; + } + boolean exists = configState.contains(connName); + if (!allowReplace && exists) { + callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); + return null; + } + + log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); + writeToConfigTopicAsLeader( + "writing a config for connector " + connName + " to the config topic", + () -> configBackingStore.putConnectorConfig(connName, config, targetState) + ); + + // Note that we use the updated connector config despite the fact that we don't have an updated + // snapshot yet. The existing task info should still be accurate. + ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName), + connectorType(config)); + callback.onCompletion(null, new Created<>(!exists, info)); + return null; + }, + forwardErrorAndTickThreadStages(callback) + ); + })); + } @Override public void stopConnector(final String connName, final Callback callback) { log.trace("Submitting request to transition connector {} to STOPPED state", connName); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 7a0f02ca215..532af8730ca 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -242,6 +242,19 @@ public class ConnectorsResource { return response.entity(createdInfo.result()).build(); } + @PATCH + @Path("/{connector}/config") + public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, + final Map connectorConfigPatch) throws Throwable { + FutureCallback> cb = new FutureCallback<>(); + herder.patchConnectorConfig(connector, connectorConfigPatch, cb); + Herder.Created createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", + "PATCH", headers, connectorConfigPatch, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); + return Response.ok().entity(createdInfo.result()).build(); + } + @POST @Path("/{connector}/restart") @Operation(summary = "Restart the specified connector") diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index e1386bf6ac4..e773eeefd5c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -47,6 +47,7 @@ import org.apache.kafka.connect.storage.MemoryConfigBackingStore; import org.apache.kafka.connect.storage.MemoryStatusBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,6 +247,31 @@ public class StandaloneHerder extends AbstractHerder { } } + @Override + public synchronized void patchConnectorConfig(String connName, Map configPatch, Callback> callback) { + try { + ConnectorInfo connectorInfo = connectorInfo(connName); + if (connectorInfo == null) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + return; + } + + Map patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch); + validateConnectorConfig(patchedConfig, (error, configInfos) -> { + if (error != null) { + callback.onCompletion(error, null); + return; + } + + requestExecutorService.submit( + () -> putConnectorConfig(connName, patchedConfig, null, true, callback, configInfos) + ); + }); + } catch (Throwable e) { + callback.onCompletion(e, null); + } + } + @Override public synchronized void stopConnector(String connName, Callback callback) { try { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 3a2c88b0893..06ea5a64146 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -218,4 +219,28 @@ public final class ConnectUtils { public static String className(Object o) { return o != null ? o.getClass().getName() : "null"; } + + /** + * Apply a patch on a connector config. + * + *

In the output, the values from the patch will override the values from the config. + * {@code null} values will cause the corresponding key to be removed completely. + * @param config the config to be patched. + * @param patch the patch. + * @return the output config map. + */ + public static Map patchConfig( + Map config, + Map patch + ) { + Map result = new HashMap<>(config); + patch.forEach((k, v) -> { + if (v != null) { + result.put(k, v); + } else { + result.remove(k); + } + }); + return result; + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 35bfeab7c3d..65f2dc8daa8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -773,6 +773,43 @@ public class ConnectWorkerIntegrationTest { connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time"); } + @Test + public void testPatchConnectorConfig() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + connect.kafka().createTopic(TOPIC_NAME); + + Map props = defaultSinkConnectorProps(TOPIC_NAME); + props.put("unaffected-key", "unaffected-value"); + props.put("to-be-deleted-key", "value"); + props.put(TASKS_MAX_CONFIG, "2"); + + Map patch = new HashMap<>(); + patch.put(TASKS_MAX_CONFIG, "3"); // this plays as a value to be changed + patch.put("to-be-added-key", "value"); + patch.put("to-be-deleted-key", null); + + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2, + "connector and tasks did not start in time"); + + connect.patchConnectorConfig(CONNECTOR_NAME, patch); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 3, + "connector and tasks did not reconfigure and restart in time"); + + Map expectedConfig = new HashMap<>(props); + expectedConfig.put("name", CONNECTOR_NAME); + expectedConfig.put("to-be-added-key", "value"); + expectedConfig.put(TASKS_MAX_CONFIG, "3"); + expectedConfig.remove("to-be-deleted-key"); + assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config()); + } + private Map defaultSinkConnectorProps(String topics) { // setup props for the sink connector Map props = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index fa05e55015e..f4a7cd247bd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -2336,6 +2336,133 @@ public class DistributedHerderTest { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } + @Test + public void testPatchConnectorConfigNotFound() { + when(member.memberId()).thenReturn("leader"); + expectRebalance(0, Collections.emptyList(), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + + ClusterConfigState clusterConfigState = new ClusterConfigState( + 0, + null, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(clusterConfigState); + + Map connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + FutureCallback> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); + herder.tick(); + assertTrue(patchCallback.isDone()); + ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); + assertInstanceOf(NotFoundException.class, exception.getCause()); + } + + @Test + public void testPatchConnectorConfigNotALeader() { + when(member.memberId()).thenReturn("not-leader"); + + // The connector is pre-existing due to the mocks. + ClusterConfigState originalSnapshot = new ClusterConfigState( + 1, + null, + Collections.singletonMap(CONN1, 0), + Collections.singletonMap(CONN1, CONN1_CONFIG), + Collections.singletonMap(CONN1, TargetState.STARTED), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(originalSnapshot); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + + // Patch the connector config. + + expectMemberEnsureActive(); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), false); + + FutureCallback> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback); + herder.tick(); + assertTrue(patchCallback.isDone()); + ExecutionException fencingException = assertThrows(ExecutionException.class, patchCallback::get); + assertInstanceOf(ConnectException.class, fencingException.getCause()); + } + + @Test + public void testPatchConnectorConfig() throws Exception { + when(member.memberId()).thenReturn("leader"); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + + Map originalConnConfig = new HashMap<>(CONN1_CONFIG); + originalConnConfig.put("foo0", "unaffected"); + originalConnConfig.put("foo1", "will-be-changed"); + originalConnConfig.put("foo2", "will-be-removed"); + + // The connector is pre-existing due to the mocks. + + ClusterConfigState originalSnapshot = new ClusterConfigState( + 1, + null, + Collections.singletonMap(CONN1, 0), + Collections.singletonMap(CONN1, originalConnConfig), + Collections.singletonMap(CONN1, TargetState.STARTED), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(originalSnapshot); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + + expectMemberPoll(); + + // Patch the connector config. + Map connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "changed"); + connConfigPatch.put("foo2", null); + connConfigPatch.put("foo3", "added"); + + Map patchedConnConfig = new HashMap<>(originalConnConfig); + patchedConnConfig.put("foo0", "unaffected"); + patchedConnConfig.put("foo1", "changed"); + patchedConnConfig.remove("foo2"); + patchedConnConfig.put("foo3", "added"); + + expectMemberEnsureActive(); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + + ArgumentCaptor> validateCallback = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { + validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS); + return null; + }).when(herder).validateConnectorConfig(eq(patchedConnConfig), validateCallback.capture()); + + FutureCallback> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONN1, connConfigPatch, patchCallback); + herder.tick(); + assertTrue(patchCallback.isDone()); + assertEquals(patchedConnConfig, patchCallback.get().result().config()); + + // This is effectively the main check of this test: + // we validate that what's written in the config storage is the patched config. + verify(configBackingStore).putConnectorConfig(eq(CONN1), eq(patchedConnConfig), isNull()); + verifyNoMoreInteractions(configBackingStore); + + // No need to check herder.connectorConfig explicitly: + // all the related parts are mocked and that the config is correct is checked by eq()'s in the mocked called above. + } + @Test public void testKeyRotationWhenWorkerBecomesLeader() throws Exception { long rotationTtlDelay = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 65f27f7e925..2386b2558e1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -109,6 +109,17 @@ public class ConnectorsResourceTest { CONNECTOR_CONFIG.put("sample_config", "test_config"); } + private static final Map CONNECTOR_CONFIG_PATCH = new HashMap<>(); + static { + CONNECTOR_CONFIG_PATCH.put("sample_config", "test_config_new"); + CONNECTOR_CONFIG_PATCH.put("sample_config_2", "test_config_2"); + } + + private static final Map CONNECTOR_CONFIG_PATCHED = new HashMap<>(CONNECTOR_CONFIG); + static { + CONNECTOR_CONFIG_PATCHED.putAll(CONNECTOR_CONFIG_PATCH); + } + private static final Map CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>(); static { CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1); @@ -588,6 +599,37 @@ public class ConnectorsResourceTest { assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request)); } + @Test + public void testPatchConnectorConfig() throws Throwable { + final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG_PATCHED, CONNECTOR_TASK_NAMES, + ConnectorType.SINK)) + ).when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture()); + + connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH); + } + + @Test + public void testPatchConnectorConfigLeaderRedirect() throws Throwable { + final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb) + .when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture()); + when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/config?forward=false"), eq("PATCH"), isNull(), eq(CONNECTOR_CONFIG_PATCH), any())) + .thenReturn(new RestClient.HttpResponse<>(200, new HashMap<>(CONNECTOR_CONFIG_PATCHED), null)); + + connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH); + } + + @Test + public void testPatchConnectorConfigNotFound() throws Throwable { + final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("Connector " + CONNECTOR_NAME + " not found")) + .when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture()); + + assertThrows(NotFoundException.class, () -> connectorsResource.patchConnectorConfig( + CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH)); + } + @Test public void testGetConnectorTaskConfigs() throws Throwable { final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index bed08ffa2eb..e8ab2add181 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -77,7 +77,9 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import static java.util.Collections.emptyList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; @@ -751,6 +753,77 @@ public class StandaloneHerderTest { verifyNoMoreInteractions(connectorConfigCb); } + @Test + public void testPatchConnectorConfigNotFound() { + Map connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + Callback> patchCallback = mock(Callback.class); + herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); + verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); + assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); + } + + @Test + public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException { + // Create the connector. + Map originalConnConfig = connectorConfig(SourceSink.SOURCE); + originalConnConfig.put("foo0", "unaffected"); + originalConnConfig.put("foo1", "will-be-changed"); + originalConnConfig.put("foo2", "will-be-removed"); + + Map connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "changed"); + connConfigPatch.put("foo2", null); + connConfigPatch.put("foo3", "added"); + + Map patchedConnConfig = new HashMap<>(originalConnConfig); + patchedConnConfig.put("foo0", "unaffected"); + patchedConnConfig.put("foo1", "changed"); + patchedConnConfig.remove("foo2"); + patchedConnConfig.put("foo3", "added"); + + expectAdd(SourceSink.SOURCE); + expectConfigValidation(SourceSink.SOURCE, originalConnConfig, patchedConnConfig); + + expectConnectorStartingWithoutTasks(originalConnConfig, SourceSink.SOURCE); + + herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, createCallback); + createCallback.get(1000L, TimeUnit.SECONDS); + + expectConnectorStartingWithoutTasks(patchedConnConfig, SourceSink.SOURCE); + + FutureCallback> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + + Map returnedConfig = patchCallback.get(1000L, TimeUnit.SECONDS).result().config(); + assertEquals(patchedConnConfig, returnedConfig); + + // Also check the returned config when requested. + FutureCallback> configCallback = new FutureCallback<>(); + herder.connectorConfig(CONNECTOR_NAME, configCallback); + + Map returnedConfig2 = configCallback.get(1000L, TimeUnit.SECONDS); + assertEquals(patchedConnConfig, returnedConfig2); + } + + private void expectConnectorStartingWithoutTasks(Map config, SourceSink sourceSink) { + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); + final ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { + onStart.getValue().onCompletion(null, TargetState.STARTED); + return true; + }).when(worker).startConnector(eq(CONNECTOR_NAME), any(Map.class), any(), + eq(herder), eq(TargetState.STARTED), onStart.capture()); + ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ? + new SourceConnectorConfig(plugins, config, true) : + new SinkConnectorConfig(plugins, config); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) + .thenReturn(emptyList()); + } + @Test public void testPutTaskConfigs() { Callback cb = mock(Callback.class); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java index 0bdfa0d8a91..42fe5d1eb13 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java @@ -169,4 +169,24 @@ public class ConnectUtilsTest { assertEquals(expectedClientIdBase, actualClientIdBase); } + @Test + public void testPatchConfig() { + HashMap config = new HashMap<>(); + config.put("unaffected-key", "unaffected-value"); + config.put("to-be-changed-key", "to-be-changed-value-old"); + config.put("to-be-deleted-key", "to-be-deleted-value"); + + HashMap patch = new HashMap<>(); + patch.put("to-be-changed-key", "to-be-changed-value-new"); + patch.put("to-be-deleted-key", null); + patch.put("to-be-added-key", "to-be-added-value"); + + HashMap expectedResult = new HashMap<>(); + expectedResult.put("unaffected-key", "unaffected-value"); + expectedResult.put("to-be-changed-key", "to-be-changed-value-new"); + expectedResult.put("to-be-added-key", "to-be-added-value"); + + Map result = ConnectUtils.patchConfig(config, patch); + assertEquals(expectedResult, result); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index 0e58d633584..a37de76d1ae 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -271,6 +271,43 @@ abstract class EmbeddedConnect { "Could not execute PUT request. Error response: " + responseToString(response)); } + /** + * Patch the config of a connector. + * + * @param connName the name of the connector + * @param connConfigPatch the configuration patch + * @throws ConnectRestException if the REST API returns error status + * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent + */ + public String patchConnectorConfig(String connName, Map connConfigPatch) { + String url = endpointForResource(String.format("connectors/%s/config", connName)); + return doPatchConnectorConfig(url, connConfigPatch); + } + + /** + * Execute a PATCH request with the given connector configuration on the given URL endpoint. + * + * @param url the full URL of the endpoint that corresponds to the given REST resource + * @param connConfigPatch the configuration patch + * @throws ConnectRestException if the REST api returns error status + * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent + */ + protected String doPatchConnectorConfig(String url, Map connConfigPatch) { + ObjectMapper mapper = new ObjectMapper(); + String content; + try { + content = mapper.writeValueAsString(connConfigPatch); + } catch (IOException e) { + throw new ConnectException("Could not serialize connector configuration and execute PUT request"); + } + Response response = requestPatch(url, content); + if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { + return responseToString(response); + } + throw new ConnectRestException(response.getStatus(), + "Could not execute PATCH request. Error response: " + responseToString(response)); + } + /** * Delete an existing connector. * diff --git a/docs/connect.html b/docs/connect.html index c06879813fb..449f066fd0a 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -291,6 +291,7 @@ predicates.IsBar.pattern=bar

  • GET /connectors/{name} - get information about a specific connector
  • GET /connectors/{name}/config - get the configuration parameters for a specific connector
  • PUT /connectors/{name}/config - update the configuration parameters for a specific connector
  • +
  • PATCH /connectors/{name}/config - patch the configuration parameters for a specific connector, where null values in the JSON body indicates removing of the key from the final configuration
  • GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • GET /connectors/{name}/tasks - get a list of tasks currently running for a connector along with their configurations
  • GET /connectors/{name}/tasks-config - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the GET /connectors/{name}/tasks endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the OpenAPI documentation for more details