KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST API (#6791)

When Connect forwards a REST request from one worker to another, the Authorization header was not forwarded. This commit changes the Connect framework to add include the authorization header when forwarding requests to other workers.

Author: Hai-Dang Dam <damquanghaidang@gmail.com>
Reviewers: Robert Yokota <rayokota@gmail.com>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Hai-Dang Dam 2019-06-03 19:06:00 -07:00 committed by Randall Hauch
parent 573152dfa8
commit 1a3fe9aa52
7 changed files with 175 additions and 68 deletions

View File

@ -17,6 +17,8 @@
package org.apache.kafka.connect.rest.basic.auth.extension;
import java.util.regex.Pattern;
import javax.ws.rs.HttpMethod;
import org.apache.kafka.common.config.ConfigException;
import java.io.IOException;
@ -35,18 +37,18 @@ import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.core.Response;
public class JaasBasicAuthFilter implements ContainerRequestFilter {
private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
static final String AUTHORIZATION = "Authorization";
private static final Pattern TASK_REQUEST_PATTERN = Pattern.compile("/?connectors/([^/]+)/tasks/?");
@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
try {
LoginContext loginContext =
new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
requestContext.getHeaderString(AUTHORIZATION)));
loginContext.login();
if (!(requestContext.getMethod().equals(HttpMethod.POST) && TASK_REQUEST_PATTERN.matcher(requestContext.getUriInfo().getPath()).matches())) {
LoginContext loginContext =
new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
requestContext.getHeaderString(AUTHORIZATION)));
loginContext.login();
}
} catch (LoginException | ConfigException e) {
requestContext.abortWith(
Response.status(Response.Status.UNAUTHORIZED)

View File

@ -17,12 +17,15 @@
package org.apache.kafka.connect.rest.basic.auth.extension;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.UriInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
@ -52,6 +55,9 @@ public class JaasBasicAuthFilterTest {
private String previousJaasConfig;
private Configuration previousConfiguration;
@MockStrict
private UriInfo uriInfo;
@Before
public void setup() {
EasyMock.reset(requestContext);
@ -137,7 +143,34 @@ public class JaasBasicAuthFilterTest {
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testPostWithoutAppropriateCredential() throws IOException {
EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
EasyMock.expect(uriInfo.getPath()).andReturn("connectors/connName/tasks");
PowerMock.replayAll();
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testPostNotChangingConnectorTask() throws IOException {
EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
EasyMock.expect(uriInfo.getPath()).andReturn("local:randomport/connectors/connName");
String authHeader = "Basic" + Base64.getEncoder().encodeToString(("user" + ":" + "password").getBytes());
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
.andReturn(authHeader);
requestContext.abortWith(EasyMock.anyObject(Response.class));
EasyMock.expectLastCall();
PowerMock.replayAll();
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
private void setMock(String authorization, String username, String password, boolean exceptionCase) {
EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.GET);
String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
.andReturn(authHeader);
@ -152,7 +185,6 @@ public class JaasBasicAuthFilterTest {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
List<String> lines;
lines = new ArrayList<>();
lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required ");

View File

@ -1202,7 +1202,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
return;
}
String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
RestClient.httpRequest(reconfigUrl, "POST", rawTaskProps, null, config);
RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@ -50,12 +51,13 @@ public class RestClient {
*
* @param url HTTP connection will be established with this url.
* @param method HTTP method ("GET", "POST", "PUT", etc.)
* @param headers HTTP headers from REST endpoint
* @param requestBodyData Object to serialize as JSON and send in the request body.
* @param responseFormat Expected format of the response to the HTTP request.
* @param <T> The type of the deserialized response to the HTTP request.
* @return The deserialized response to the HTTP request, or null if no data is expected.
*/
public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
public static <T> HttpResponse<T> httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData,
TypeReference<T> responseFormat, WorkerConfig config) {
HttpClient client;
@ -82,6 +84,8 @@ public class RestClient {
req.method(method);
req.accept("application/json");
req.agent("kafka-connect");
addHeadersToRequest(headers, req);
if (serializedBody != null) {
req.content(new StringContentProvider(serializedBody, StandardCharsets.UTF_8), "application/json");
}
@ -116,6 +120,21 @@ public class RestClient {
}
}
/**
* Extract headers from REST call and add to client request
* @param headers Headers from REST endpoint
* @param req The client request to modify
*/
private static void addHeadersToRequest(HttpHeaders headers, Request req) {
if (headers != null) {
String credentialAuthorization = headers.getHeaderString(HttpHeaders.AUTHORIZATION);
if (credentialAuthorization != null) {
req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization);
}
}
}
/**
* Convert response parameters from Jetty format (HttpFields)
* @param httpFields

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@ -85,7 +86,8 @@ public class ConnectorsResource {
@GET
@Path("/")
public Response listConnectors(
final @Context UriInfo uriInfo
final @Context UriInfo uriInfo,
final @Context HttpHeaders headers
) throws Throwable {
if (uriInfo.getQueryParameters().containsKey("expand")) {
Map<String, Map<String, Object>> out = new HashMap<>();
@ -121,6 +123,7 @@ public class ConnectorsResource {
@POST
@Path("/")
public Response createConnector(final @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers,
final CreateConnectorRequest createRequest) throws Throwable {
// Trim leading and trailing whitespaces from the connector name, replace null with empty string
// if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
@ -132,7 +135,7 @@ public class ConnectorsResource {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(name, configs, false, cb);
Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
URI location = UriBuilder.fromUri("/connectors").path(name).build();
@ -142,19 +145,21 @@ public class ConnectorsResource {
@GET
@Path("/{connector}")
public ConnectorInfo getConnector(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
herder.connectorInfo(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
}
@GET
@Path("/{connector}/config")
public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
herder.connectorConfig(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
}
@GET
@ -166,6 +171,7 @@ public class ConnectorsResource {
@PUT
@Path("/{connector}/config")
public Response putConnectorConfig(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,
final Map<String, String> connectorConfig) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
@ -173,7 +179,7 @@ public class ConnectorsResource {
herder.putConnectorConfig(connector, connectorConfig, true, cb);
Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
"PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
"PUT", headers, connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
Response.ResponseBuilder response;
if (createdInfo.created()) {
URI location = UriBuilder.fromUri("/connectors").path(connector).build();
@ -187,15 +193,16 @@ public class ConnectorsResource {
@POST
@Path("/{connector}/restart")
public void restartConnector(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(connector, cb);
completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward);
completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward);
}
@PUT
@Path("/{connector}/pause")
public Response pauseConnector(@PathParam("connector") String connector) {
public Response pauseConnector(@PathParam("connector") String connector, final @Context HttpHeaders headers) {
herder.pauseConnector(connector);
return Response.accepted().build();
}
@ -210,26 +217,29 @@ public class ConnectorsResource {
@GET
@Path("/{connector}/tasks")
public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
herder.taskConfigs(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() {
}, forward);
}
@POST
@Path("/{connector}/tasks")
public void putTaskConfigs(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,
final List<Map<String, String>> taskConfigs) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
herder.putTaskConfigs(connector, taskConfigs, cb);
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward);
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
}
@GET
@Path("/{connector}/tasks/{task}/status")
public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @PathParam("task") Integer task) throws Throwable {
return herder.taskStatus(new ConnectorTaskId(connector, task));
}
@ -238,20 +248,22 @@ public class ConnectorsResource {
@Path("/{connector}/tasks/{task}/restart")
public void restartTask(final @PathParam("connector") String connector,
final @PathParam("task") Integer task,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
herder.restartTask(taskId, cb);
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward);
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
}
@DELETE
@Path("/{connector}")
public void destroyConnector(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.deleteConnectorConfig(connector, cb);
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
}
// Check whether the connector name from the url matches the one (if there is one) provided in the connectorconfig
@ -271,6 +283,7 @@ public class ConnectorsResource {
private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
String path,
String method,
HttpHeaders headers,
Object body,
TypeReference<U> resultType,
Translator<T, U> translator,
@ -293,7 +306,7 @@ public class ConnectorsResource {
.build()
.toString();
log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
return translator.translate(RestClient.httpRequest(forwardUrl, method, body, resultType, config));
return translator.translate(RestClient.httpRequest(forwardUrl, method, headers, body, resultType, config));
} else {
// we should find the right target for the query within two hops, so if
// we don't, it probably means that a rebalance has taken place.
@ -315,14 +328,14 @@ public class ConnectorsResource {
}
}
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body,
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
TypeReference<T> resultType, Boolean forward) throws Throwable {
return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>(), forward);
return completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator<T>(), forward);
}
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method,
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers,
Object body, Boolean forward) throws Throwable {
return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>(), forward);
return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<T>(), forward);
}
private interface Translator<T, U> {

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -182,7 +183,7 @@ public class ConnectorPluginsResourceTest {
@Before
public void setUp() throws Exception {
PowerMock.mockStatic(RestClient.class,
RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class));
plugins = PowerMock.createMock(Plugins.class);
herder = PowerMock.createMock(AbstractHerder.class);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
@ -79,6 +80,7 @@ public class ConnectorsResourceTest {
private static final String CONNECTOR_NAME_PADDING_WHITESPACES = " " + CONNECTOR_NAME + " \n ";
private static final Boolean FORWARD = true;
private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS = new HashMap<>();
private static final HttpHeaders NULL_HEADERS = null;
static {
CONNECTOR_CONFIG_SPECIAL_CHARS.put("name", CONNECTOR_NAME_SPECIAL_CHARS);
CONNECTOR_CONFIG_SPECIAL_CHARS.put("sample_config", "test_config");
@ -130,7 +132,7 @@ public class ConnectorsResourceTest {
@Before
public void setUp() throws NoSuchMethodException {
PowerMock.mockStatic(RestClient.class,
RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class));
connectorsResource = new ConnectorsResource(herder, null);
forward = EasyMock.mock(UriInfo.class);
MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
@ -151,7 +153,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
Collection<String> connectors = (Collection<String>) connectorsResource.listConnectors(forward).getEntity();
Collection<String> connectors = (Collection<String>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity();
// Ordering isn't guaranteed, compare sets
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
@ -174,7 +176,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity();
// Ordering isn't guaranteed, compare sets
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
@ -198,7 +200,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity();
// Ordering isn't guaranteed, compare sets
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("info"));
@ -226,7 +228,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity();
// Ordering isn't guaranteed, compare sets
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet());
assertEquals(connectorInfo2, expanded.get(CONNECTOR2_NAME).get("info"));
@ -252,7 +254,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity();
Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity();
// Ordering isn't guaranteed, compare sets
assertEquals(Collections.singleton(CONNECTOR2_NAME), expanded.keySet());
assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status"));
@ -271,7 +273,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, body);
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
PowerMock.verifyAll();
}
@ -284,19 +286,57 @@ public class ConnectorsResourceTest {
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(201, new HashMap<String, String>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, body);
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorWithHeaderAuthorization() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn("Basic YWxhZGRpbjpvcGVuc2VzYW1l").times(1);
EasyMock.replay(httpHeaders);
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, httpHeaders, body);
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorWithoutHeaderAuthorization() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn(null).times(1);
EasyMock.replay(httpHeaders);
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, httpHeaders, body);
PowerMock.verifyAll();
}
@Test(expected = AlreadyExistsException.class)
public void testCreateConnectorExists() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
@ -307,7 +347,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, body);
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
PowerMock.verifyAll();
}
@ -326,7 +366,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, bodyIn);
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
PowerMock.verifyAll();
}
@ -345,7 +385,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, bodyIn);
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
PowerMock.verifyAll();
}
@ -364,7 +404,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, bodyIn);
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
PowerMock.verifyAll();
}
@ -377,7 +417,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
PowerMock.verifyAll();
}
@ -388,12 +428,12 @@ public class ConnectorsResourceTest {
herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", null, null, null))
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, null))
.andReturn(new RestClient.HttpResponse<>(204, new HashMap<String, String>(), null));
PowerMock.replayAll();
connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
PowerMock.verifyAll();
}
@ -407,7 +447,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
PowerMock.verifyAll();
}
@ -421,7 +461,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, FORWARD);
ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE),
connInfo);
@ -436,7 +476,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
assertEquals(CONNECTOR_CONFIG, connConfig);
PowerMock.verifyAll();
@ -450,7 +490,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
PowerMock.verifyAll();
}
@ -464,7 +504,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, CONNECTOR_CONFIG);
connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG);
PowerMock.verifyAll();
}
@ -480,7 +520,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, decoded);
@ -498,7 +538,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
@ -515,7 +555,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS, FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, decoded);
@ -532,7 +572,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
@ -543,7 +583,7 @@ public class ConnectorsResourceTest {
public void testPutConnectorConfigNameMismatch() throws Throwable {
Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, connConfig);
connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, connConfig);
}
@Test(expected = BadRequestException.class)
@ -551,7 +591,7 @@ public class ConnectorsResourceTest {
Map<String, String> connConfig = new HashMap<>();
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig);
connectorsResource.createConnector(FORWARD, request);
connectorsResource.createConnector(FORWARD, NULL_HEADERS, request);
}
@Test
@ -562,7 +602,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
assertEquals(TASK_INFOS, taskInfos);
PowerMock.verifyAll();
@ -576,7 +616,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
PowerMock.verifyAll();
}
@ -589,7 +629,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS);
connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, TASK_CONFIGS);
PowerMock.verifyAll();
}
@ -602,7 +642,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS);
connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, TASK_CONFIGS);
PowerMock.verifyAll();
}
@ -615,7 +655,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.restartConnector(CONNECTOR_NAME, FORWARD);
connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
PowerMock.verifyAll();
}
@ -627,12 +667,12 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=true"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
PowerMock.replayAll();
connectorsResource.restartConnector(CONNECTOR_NAME, null);
connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, null);
PowerMock.verifyAll();
}
@ -645,12 +685,12 @@ public class ConnectorsResourceTest {
expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
PowerMock.replayAll();
connectorsResource.restartConnector(CONNECTOR_NAME, true);
connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, true);
PowerMock.verifyAll();
}
@ -664,7 +704,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
connectorsResource.restartTask(CONNECTOR_NAME, 0, FORWARD);
connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, FORWARD);
PowerMock.verifyAll();
}
@ -678,12 +718,12 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
PowerMock.replayAll();
connectorsResource.restartTask(CONNECTOR_NAME, 0, null);
connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null);
PowerMock.verifyAll();
}
@ -698,12 +738,12 @@ public class ConnectorsResourceTest {
expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
PowerMock.replayAll();
connectorsResource.restartTask(CONNECTOR_NAME, 0, true);
connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true);
PowerMock.verifyAll();
}