mirror of https://github.com/apache/kafka.git
KAFKA-16445: Add PATCH method for connector config (#6934)
Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
parent
f4fdaa702a
commit
5a9ccb6b77
|
@ -1185,5 +1185,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
|
||||||
|
|
||||||
return loggers.setLevel(namespace, level);
|
return loggers.setLevel(namespace, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,6 +121,14 @@ public interface Herder {
|
||||||
void putConnectorConfig(String connName, Map<String, String> config, TargetState targetState, boolean allowReplace,
|
void putConnectorConfig(String connName, Map<String, String> config, TargetState targetState, boolean allowReplace,
|
||||||
Callback<Created<ConnectorInfo>> callback);
|
Callback<Created<ConnectorInfo>> callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Patch the configuration for a connector.
|
||||||
|
* @param connName name of the connector
|
||||||
|
* @param configPatch the connector's configuration patch.
|
||||||
|
* @param callback callback to invoke when the configuration has been written
|
||||||
|
*/
|
||||||
|
void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete a connector and its configuration.
|
* Delete a connector and its configuration.
|
||||||
* @param connName name of the connector
|
* @param connName name of the connector
|
||||||
|
|
|
@ -1096,6 +1096,39 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
||||||
log.trace("Submitting connector config write request {}", connName);
|
log.trace("Submitting connector config write request {}", connName);
|
||||||
addRequest(
|
addRequest(
|
||||||
() -> {
|
() -> {
|
||||||
|
doPutConnectorConfig(connName, config, targetState, allowReplace, callback);
|
||||||
|
return null;
|
||||||
|
},
|
||||||
|
forwardErrorAndTickThreadStages(callback)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) {
|
||||||
|
log.trace("Submitting connector config patch request {}", connName);
|
||||||
|
addRequest(() -> {
|
||||||
|
// This reduces (but not completely eliminates) the chance for race conditions.
|
||||||
|
if (!isLeader()) {
|
||||||
|
callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectorInfo connectorInfo = connectorInfo(connName);
|
||||||
|
if (connectorInfo == null) {
|
||||||
|
callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
|
||||||
|
} else {
|
||||||
|
Map<String, String> patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch);
|
||||||
|
doPutConnectorConfig(connName, patchedConfig, null, true, callback);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}, forwardErrorAndTickThreadStages(callback));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doPutConnectorConfig(
|
||||||
|
String connName,
|
||||||
|
Map<String, String> config,
|
||||||
|
TargetState targetState, boolean allowReplace,
|
||||||
|
Callback<Created<ConnectorInfo>> callback) {
|
||||||
validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> {
|
validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
callback.onCompletion(error, null);
|
callback.onCompletion(error, null);
|
||||||
|
@ -1138,12 +1171,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
||||||
forwardErrorAndTickThreadStages(callback)
|
forwardErrorAndTickThreadStages(callback)
|
||||||
);
|
);
|
||||||
}));
|
}));
|
||||||
return null;
|
|
||||||
},
|
|
||||||
forwardErrorAndTickThreadStages(callback)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stopConnector(final String connName, final Callback<Void> callback) {
|
public void stopConnector(final String connName, final Callback<Void> callback) {
|
||||||
log.trace("Submitting request to transition connector {} to STOPPED state", connName);
|
log.trace("Submitting request to transition connector {} to STOPPED state", connName);
|
||||||
|
|
|
@ -242,6 +242,19 @@ public class ConnectorsResource {
|
||||||
return response.entity(createdInfo.result()).build();
|
return response.entity(createdInfo.result()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PATCH
|
||||||
|
@Path("/{connector}/config")
|
||||||
|
public Response patchConnectorConfig(final @PathParam("connector") String connector,
|
||||||
|
final @Context HttpHeaders headers,
|
||||||
|
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
|
||||||
|
final Map<String, String> connectorConfigPatch) throws Throwable {
|
||||||
|
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
|
||||||
|
herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
|
||||||
|
Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
|
||||||
|
"PATCH", headers, connectorConfigPatch, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
|
||||||
|
return Response.ok().entity(createdInfo.result()).build();
|
||||||
|
}
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
@Path("/{connector}/restart")
|
@Path("/{connector}/restart")
|
||||||
@Operation(summary = "Restart the specified connector")
|
@Operation(summary = "Restart the specified connector")
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
|
||||||
import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
|
import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
|
||||||
import org.apache.kafka.connect.storage.StatusBackingStore;
|
import org.apache.kafka.connect.storage.StatusBackingStore;
|
||||||
import org.apache.kafka.connect.util.Callback;
|
import org.apache.kafka.connect.util.Callback;
|
||||||
|
import org.apache.kafka.connect.util.ConnectUtils;
|
||||||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -246,6 +247,31 @@ public class StandaloneHerder extends AbstractHerder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) {
|
||||||
|
try {
|
||||||
|
ConnectorInfo connectorInfo = connectorInfo(connName);
|
||||||
|
if (connectorInfo == null) {
|
||||||
|
callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch);
|
||||||
|
validateConnectorConfig(patchedConfig, (error, configInfos) -> {
|
||||||
|
if (error != null) {
|
||||||
|
callback.onCompletion(error, null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
requestExecutorService.submit(
|
||||||
|
() -> putConnectorConfig(connName, patchedConfig, null, true, callback, configInfos)
|
||||||
|
);
|
||||||
|
});
|
||||||
|
} catch (Throwable e) {
|
||||||
|
callback.onCompletion(e, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void stopConnector(String connName, Callback<Void> callback) {
|
public synchronized void stopConnector(String connName, Callback<Void> callback) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -218,4 +219,28 @@ public final class ConnectUtils {
|
||||||
public static String className(Object o) {
|
public static String className(Object o) {
|
||||||
return o != null ? o.getClass().getName() : "null";
|
return o != null ? o.getClass().getName() : "null";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply a patch on a connector config.
|
||||||
|
*
|
||||||
|
* <p>In the output, the values from the patch will override the values from the config.
|
||||||
|
* {@code null} values will cause the corresponding key to be removed completely.
|
||||||
|
* @param config the config to be patched.
|
||||||
|
* @param patch the patch.
|
||||||
|
* @return the output config map.
|
||||||
|
*/
|
||||||
|
public static Map<String, String> patchConfig(
|
||||||
|
Map<String, String> config,
|
||||||
|
Map<String, String> patch
|
||||||
|
) {
|
||||||
|
Map<String, String> result = new HashMap<>(config);
|
||||||
|
patch.forEach((k, v) -> {
|
||||||
|
if (v != null) {
|
||||||
|
result.put(k, v);
|
||||||
|
} else {
|
||||||
|
result.remove(k);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -773,6 +773,43 @@ public class ConnectWorkerIntegrationTest {
|
||||||
connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time");
|
connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfig() throws Exception {
|
||||||
|
connect = connectBuilder.build();
|
||||||
|
// start the clusters
|
||||||
|
connect.start();
|
||||||
|
|
||||||
|
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
|
||||||
|
"Initial group of workers did not start in time.");
|
||||||
|
|
||||||
|
connect.kafka().createTopic(TOPIC_NAME);
|
||||||
|
|
||||||
|
Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
|
||||||
|
props.put("unaffected-key", "unaffected-value");
|
||||||
|
props.put("to-be-deleted-key", "value");
|
||||||
|
props.put(TASKS_MAX_CONFIG, "2");
|
||||||
|
|
||||||
|
Map<String, String> patch = new HashMap<>();
|
||||||
|
patch.put(TASKS_MAX_CONFIG, "3"); // this plays as a value to be changed
|
||||||
|
patch.put("to-be-added-key", "value");
|
||||||
|
patch.put("to-be-deleted-key", null);
|
||||||
|
|
||||||
|
connect.configureConnector(CONNECTOR_NAME, props);
|
||||||
|
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2,
|
||||||
|
"connector and tasks did not start in time");
|
||||||
|
|
||||||
|
connect.patchConnectorConfig(CONNECTOR_NAME, patch);
|
||||||
|
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 3,
|
||||||
|
"connector and tasks did not reconfigure and restart in time");
|
||||||
|
|
||||||
|
Map<String, String> expectedConfig = new HashMap<>(props);
|
||||||
|
expectedConfig.put("name", CONNECTOR_NAME);
|
||||||
|
expectedConfig.put("to-be-added-key", "value");
|
||||||
|
expectedConfig.put(TASKS_MAX_CONFIG, "3");
|
||||||
|
expectedConfig.remove("to-be-deleted-key");
|
||||||
|
assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config());
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, String> defaultSinkConnectorProps(String topics) {
|
private Map<String, String> defaultSinkConnectorProps(String topics) {
|
||||||
// setup props for the sink connector
|
// setup props for the sink connector
|
||||||
Map<String, String> props = new HashMap<>();
|
Map<String, String> props = new HashMap<>();
|
||||||
|
|
|
@ -2336,6 +2336,133 @@ public class DistributedHerderTest {
|
||||||
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
|
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfigNotFound() {
|
||||||
|
when(member.memberId()).thenReturn("leader");
|
||||||
|
expectRebalance(0, Collections.emptyList(), Collections.emptyList(), true);
|
||||||
|
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
|
||||||
|
|
||||||
|
ClusterConfigState clusterConfigState = new ClusterConfigState(
|
||||||
|
0,
|
||||||
|
null,
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptySet(),
|
||||||
|
Collections.emptySet());
|
||||||
|
expectConfigRefreshAndSnapshot(clusterConfigState);
|
||||||
|
|
||||||
|
Map<String, String> connConfigPatch = new HashMap<>();
|
||||||
|
connConfigPatch.put("foo1", "baz1");
|
||||||
|
|
||||||
|
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
|
||||||
|
herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
|
||||||
|
herder.tick();
|
||||||
|
assertTrue(patchCallback.isDone());
|
||||||
|
ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get);
|
||||||
|
assertInstanceOf(NotFoundException.class, exception.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfigNotALeader() {
|
||||||
|
when(member.memberId()).thenReturn("not-leader");
|
||||||
|
|
||||||
|
// The connector is pre-existing due to the mocks.
|
||||||
|
ClusterConfigState originalSnapshot = new ClusterConfigState(
|
||||||
|
1,
|
||||||
|
null,
|
||||||
|
Collections.singletonMap(CONN1, 0),
|
||||||
|
Collections.singletonMap(CONN1, CONN1_CONFIG),
|
||||||
|
Collections.singletonMap(CONN1, TargetState.STARTED),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptySet(),
|
||||||
|
Collections.emptySet());
|
||||||
|
expectConfigRefreshAndSnapshot(originalSnapshot);
|
||||||
|
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
|
||||||
|
|
||||||
|
// Patch the connector config.
|
||||||
|
|
||||||
|
expectMemberEnsureActive();
|
||||||
|
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), false);
|
||||||
|
|
||||||
|
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
|
||||||
|
herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback);
|
||||||
|
herder.tick();
|
||||||
|
assertTrue(patchCallback.isDone());
|
||||||
|
ExecutionException fencingException = assertThrows(ExecutionException.class, patchCallback::get);
|
||||||
|
assertInstanceOf(ConnectException.class, fencingException.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfig() throws Exception {
|
||||||
|
when(member.memberId()).thenReturn("leader");
|
||||||
|
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true);
|
||||||
|
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
|
||||||
|
|
||||||
|
Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
|
||||||
|
originalConnConfig.put("foo0", "unaffected");
|
||||||
|
originalConnConfig.put("foo1", "will-be-changed");
|
||||||
|
originalConnConfig.put("foo2", "will-be-removed");
|
||||||
|
|
||||||
|
// The connector is pre-existing due to the mocks.
|
||||||
|
|
||||||
|
ClusterConfigState originalSnapshot = new ClusterConfigState(
|
||||||
|
1,
|
||||||
|
null,
|
||||||
|
Collections.singletonMap(CONN1, 0),
|
||||||
|
Collections.singletonMap(CONN1, originalConnConfig),
|
||||||
|
Collections.singletonMap(CONN1, TargetState.STARTED),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.emptySet(),
|
||||||
|
Collections.emptySet());
|
||||||
|
expectConfigRefreshAndSnapshot(originalSnapshot);
|
||||||
|
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
|
||||||
|
|
||||||
|
expectMemberPoll();
|
||||||
|
|
||||||
|
// Patch the connector config.
|
||||||
|
Map<String, String> connConfigPatch = new HashMap<>();
|
||||||
|
connConfigPatch.put("foo1", "changed");
|
||||||
|
connConfigPatch.put("foo2", null);
|
||||||
|
connConfigPatch.put("foo3", "added");
|
||||||
|
|
||||||
|
Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig);
|
||||||
|
patchedConnConfig.put("foo0", "unaffected");
|
||||||
|
patchedConnConfig.put("foo1", "changed");
|
||||||
|
patchedConnConfig.remove("foo2");
|
||||||
|
patchedConnConfig.put("foo3", "added");
|
||||||
|
|
||||||
|
expectMemberEnsureActive();
|
||||||
|
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true);
|
||||||
|
|
||||||
|
ArgumentCaptor<Callback<ConfigInfos>> validateCallback = ArgumentCaptor.forClass(Callback.class);
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS);
|
||||||
|
return null;
|
||||||
|
}).when(herder).validateConnectorConfig(eq(patchedConnConfig), validateCallback.capture());
|
||||||
|
|
||||||
|
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
|
||||||
|
herder.patchConnectorConfig(CONN1, connConfigPatch, patchCallback);
|
||||||
|
herder.tick();
|
||||||
|
assertTrue(patchCallback.isDone());
|
||||||
|
assertEquals(patchedConnConfig, patchCallback.get().result().config());
|
||||||
|
|
||||||
|
// This is effectively the main check of this test:
|
||||||
|
// we validate that what's written in the config storage is the patched config.
|
||||||
|
verify(configBackingStore).putConnectorConfig(eq(CONN1), eq(patchedConnConfig), isNull());
|
||||||
|
verifyNoMoreInteractions(configBackingStore);
|
||||||
|
|
||||||
|
// No need to check herder.connectorConfig explicitly:
|
||||||
|
// all the related parts are mocked and that the config is correct is checked by eq()'s in the mocked called above.
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
|
public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
|
||||||
long rotationTtlDelay = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
|
long rotationTtlDelay = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
|
||||||
|
|
|
@ -109,6 +109,17 @@ public class ConnectorsResourceTest {
|
||||||
CONNECTOR_CONFIG.put("sample_config", "test_config");
|
CONNECTOR_CONFIG.put("sample_config", "test_config");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final Map<String, String> CONNECTOR_CONFIG_PATCH = new HashMap<>();
|
||||||
|
static {
|
||||||
|
CONNECTOR_CONFIG_PATCH.put("sample_config", "test_config_new");
|
||||||
|
CONNECTOR_CONFIG_PATCH.put("sample_config_2", "test_config_2");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Map<String, String> CONNECTOR_CONFIG_PATCHED = new HashMap<>(CONNECTOR_CONFIG);
|
||||||
|
static {
|
||||||
|
CONNECTOR_CONFIG_PATCHED.putAll(CONNECTOR_CONFIG_PATCH);
|
||||||
|
}
|
||||||
|
|
||||||
private static final Map<String, String> CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>();
|
private static final Map<String, String> CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>();
|
||||||
static {
|
static {
|
||||||
CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1);
|
CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1);
|
||||||
|
@ -588,6 +599,37 @@ public class ConnectorsResourceTest {
|
||||||
assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
|
assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfig() throws Throwable {
|
||||||
|
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
|
||||||
|
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG_PATCHED, CONNECTOR_TASK_NAMES,
|
||||||
|
ConnectorType.SINK))
|
||||||
|
).when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
|
||||||
|
|
||||||
|
connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfigLeaderRedirect() throws Throwable {
|
||||||
|
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
|
||||||
|
expectAndCallbackNotLeaderException(cb)
|
||||||
|
.when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
|
||||||
|
when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/config?forward=false"), eq("PATCH"), isNull(), eq(CONNECTOR_CONFIG_PATCH), any()))
|
||||||
|
.thenReturn(new RestClient.HttpResponse<>(200, new HashMap<>(CONNECTOR_CONFIG_PATCHED), null));
|
||||||
|
|
||||||
|
connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfigNotFound() throws Throwable {
|
||||||
|
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
|
||||||
|
expectAndCallbackException(cb, new NotFoundException("Connector " + CONNECTOR_NAME + " not found"))
|
||||||
|
.when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
|
||||||
|
|
||||||
|
assertThrows(NotFoundException.class, () -> connectorsResource.patchConnectorConfig(
|
||||||
|
CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetConnectorTaskConfigs() throws Throwable {
|
public void testGetConnectorTaskConfigs() throws Throwable {
|
||||||
final ArgumentCaptor<Callback<List<TaskInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
|
final ArgumentCaptor<Callback<List<TaskInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
|
||||||
|
|
|
@ -77,7 +77,9 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static java.util.Collections.emptyList;
|
||||||
import static java.util.Collections.singleton;
|
import static java.util.Collections.singleton;
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
import static java.util.Collections.singletonMap;
|
import static java.util.Collections.singletonMap;
|
||||||
|
@ -751,6 +753,77 @@ public class StandaloneHerderTest {
|
||||||
verifyNoMoreInteractions(connectorConfigCb);
|
verifyNoMoreInteractions(connectorConfigCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfigNotFound() {
|
||||||
|
Map<String, String> connConfigPatch = new HashMap<>();
|
||||||
|
connConfigPatch.put("foo1", "baz1");
|
||||||
|
|
||||||
|
Callback<Herder.Created<ConnectorInfo>> patchCallback = mock(Callback.class);
|
||||||
|
herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);
|
||||||
|
|
||||||
|
ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class);
|
||||||
|
verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull());
|
||||||
|
assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
// Create the connector.
|
||||||
|
Map<String, String> originalConnConfig = connectorConfig(SourceSink.SOURCE);
|
||||||
|
originalConnConfig.put("foo0", "unaffected");
|
||||||
|
originalConnConfig.put("foo1", "will-be-changed");
|
||||||
|
originalConnConfig.put("foo2", "will-be-removed");
|
||||||
|
|
||||||
|
Map<String, String> connConfigPatch = new HashMap<>();
|
||||||
|
connConfigPatch.put("foo1", "changed");
|
||||||
|
connConfigPatch.put("foo2", null);
|
||||||
|
connConfigPatch.put("foo3", "added");
|
||||||
|
|
||||||
|
Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig);
|
||||||
|
patchedConnConfig.put("foo0", "unaffected");
|
||||||
|
patchedConnConfig.put("foo1", "changed");
|
||||||
|
patchedConnConfig.remove("foo2");
|
||||||
|
patchedConnConfig.put("foo3", "added");
|
||||||
|
|
||||||
|
expectAdd(SourceSink.SOURCE);
|
||||||
|
expectConfigValidation(SourceSink.SOURCE, originalConnConfig, patchedConnConfig);
|
||||||
|
|
||||||
|
expectConnectorStartingWithoutTasks(originalConnConfig, SourceSink.SOURCE);
|
||||||
|
|
||||||
|
herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, createCallback);
|
||||||
|
createCallback.get(1000L, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
expectConnectorStartingWithoutTasks(patchedConnConfig, SourceSink.SOURCE);
|
||||||
|
|
||||||
|
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
|
||||||
|
herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);
|
||||||
|
|
||||||
|
Map<String, String> returnedConfig = patchCallback.get(1000L, TimeUnit.SECONDS).result().config();
|
||||||
|
assertEquals(patchedConnConfig, returnedConfig);
|
||||||
|
|
||||||
|
// Also check the returned config when requested.
|
||||||
|
FutureCallback<Map<String, String>> configCallback = new FutureCallback<>();
|
||||||
|
herder.connectorConfig(CONNECTOR_NAME, configCallback);
|
||||||
|
|
||||||
|
Map<String, String> returnedConfig2 = configCallback.get(1000L, TimeUnit.SECONDS);
|
||||||
|
assertEquals(patchedConnConfig, returnedConfig2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectConnectorStartingWithoutTasks(Map<String, String> config, SourceSink sourceSink) {
|
||||||
|
doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
|
||||||
|
final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
onStart.getValue().onCompletion(null, TargetState.STARTED);
|
||||||
|
return true;
|
||||||
|
}).when(worker).startConnector(eq(CONNECTOR_NAME), any(Map.class), any(),
|
||||||
|
eq(herder), eq(TargetState.STARTED), onStart.capture());
|
||||||
|
ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
|
||||||
|
new SourceConnectorConfig(plugins, config, true) :
|
||||||
|
new SinkConnectorConfig(plugins, config);
|
||||||
|
when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
|
||||||
|
.thenReturn(emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutTaskConfigs() {
|
public void testPutTaskConfigs() {
|
||||||
Callback<Void> cb = mock(Callback.class);
|
Callback<Void> cb = mock(Callback.class);
|
||||||
|
|
|
@ -169,4 +169,24 @@ public class ConnectUtilsTest {
|
||||||
assertEquals(expectedClientIdBase, actualClientIdBase);
|
assertEquals(expectedClientIdBase, actualClientIdBase);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPatchConfig() {
|
||||||
|
HashMap<String, String> config = new HashMap<>();
|
||||||
|
config.put("unaffected-key", "unaffected-value");
|
||||||
|
config.put("to-be-changed-key", "to-be-changed-value-old");
|
||||||
|
config.put("to-be-deleted-key", "to-be-deleted-value");
|
||||||
|
|
||||||
|
HashMap<String, String> patch = new HashMap<>();
|
||||||
|
patch.put("to-be-changed-key", "to-be-changed-value-new");
|
||||||
|
patch.put("to-be-deleted-key", null);
|
||||||
|
patch.put("to-be-added-key", "to-be-added-value");
|
||||||
|
|
||||||
|
HashMap<String, String> expectedResult = new HashMap<>();
|
||||||
|
expectedResult.put("unaffected-key", "unaffected-value");
|
||||||
|
expectedResult.put("to-be-changed-key", "to-be-changed-value-new");
|
||||||
|
expectedResult.put("to-be-added-key", "to-be-added-value");
|
||||||
|
|
||||||
|
Map<String, String> result = ConnectUtils.patchConfig(config, patch);
|
||||||
|
assertEquals(expectedResult, result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,6 +271,43 @@ abstract class EmbeddedConnect {
|
||||||
"Could not execute PUT request. Error response: " + responseToString(response));
|
"Could not execute PUT request. Error response: " + responseToString(response));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Patch the config of a connector.
|
||||||
|
*
|
||||||
|
* @param connName the name of the connector
|
||||||
|
* @param connConfigPatch the configuration patch
|
||||||
|
* @throws ConnectRestException if the REST API returns error status
|
||||||
|
* @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
|
||||||
|
*/
|
||||||
|
public String patchConnectorConfig(String connName, Map<String, String> connConfigPatch) {
|
||||||
|
String url = endpointForResource(String.format("connectors/%s/config", connName));
|
||||||
|
return doPatchConnectorConfig(url, connConfigPatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a PATCH request with the given connector configuration on the given URL endpoint.
|
||||||
|
*
|
||||||
|
* @param url the full URL of the endpoint that corresponds to the given REST resource
|
||||||
|
* @param connConfigPatch the configuration patch
|
||||||
|
* @throws ConnectRestException if the REST api returns error status
|
||||||
|
* @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
|
||||||
|
*/
|
||||||
|
protected String doPatchConnectorConfig(String url, Map<String, String> connConfigPatch) {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
String content;
|
||||||
|
try {
|
||||||
|
content = mapper.writeValueAsString(connConfigPatch);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ConnectException("Could not serialize connector configuration and execute PUT request");
|
||||||
|
}
|
||||||
|
Response response = requestPatch(url, content);
|
||||||
|
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
|
||||||
|
return responseToString(response);
|
||||||
|
}
|
||||||
|
throw new ConnectRestException(response.getStatus(),
|
||||||
|
"Could not execute PATCH request. Error response: " + responseToString(response));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete an existing connector.
|
* Delete an existing connector.
|
||||||
*
|
*
|
||||||
|
|
|
@ -291,6 +291,7 @@ predicates.IsBar.pattern=bar</code></pre>
|
||||||
<li><code>GET /connectors/{name}</code> - get information about a specific connector</li>
|
<li><code>GET /connectors/{name}</code> - get information about a specific connector</li>
|
||||||
<li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li>
|
<li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li>
|
||||||
<li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li>
|
<li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li>
|
||||||
|
<li><code>PATCH /connectors/{name}/config</code> - patch the configuration parameters for a specific connector, where <code>null</code> values in the JSON body indicates removing of the key from the final configuration</li>
|
||||||
<li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li>
|
<li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li>
|
||||||
<li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector along with their configurations</li>
|
<li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector along with their configurations</li>
|
||||||
<li><code>GET /connectors/{name}/tasks-config</code> - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the <code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> for more details</li>
|
<li><code>GET /connectors/{name}/tasks-config</code> - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the <code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> for more details</li>
|
||||||
|
|
Loading…
Reference in New Issue