mirror of https://github.com/apache/kafka.git
KAFKA-3424: Add CORS support to Connect REST API
Author: Ewen Cheslack-Postava <me@ewencp.org> Reviewers: Gwen Shapira Closes #1099 from ewencp/cors-rest-support
This commit is contained in:
parent
6553679718
commit
eb823281a5
|
@ -749,6 +749,7 @@ project(':connect:runtime') {
|
|||
compile libs.jerseyContainerServlet
|
||||
compile libs.jettyServer
|
||||
compile libs.jettyServlet
|
||||
compile libs.jettyServlets
|
||||
compile libs.reflections
|
||||
|
||||
testCompile project(':clients').sourceSets.test.output
|
||||
|
|
|
@ -101,6 +101,15 @@ public class WorkerConfig extends AbstractConfig {
|
|||
private static final String REST_ADVERTISED_PORT_DOC
|
||||
= "If this is set, this is the port that will be given out to other workers to connect to.";
|
||||
|
||||
public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
|
||||
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
|
||||
"Value to set the Access-Control-Allow-Origin header to for REST API requests." +
|
||||
"To enable cross origin access, set this to the domain of the application that should be permitted" +
|
||||
" to access the API, or '*' to allow access from any domain. The default value only allows access" +
|
||||
" from the domain of the REST API.";
|
||||
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";
|
||||
|
||||
|
||||
/**
|
||||
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
|
||||
* bootstrap their own ConfigDef.
|
||||
|
@ -129,7 +138,10 @@ public class WorkerConfig extends AbstractConfig {
|
|||
.define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
|
||||
.define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
|
||||
.define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
|
||||
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
|
||||
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
|
||||
.define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
|
||||
ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
|
||||
ACCESS_CONTROL_ALLOW_ORIGIN_DOC);
|
||||
}
|
||||
|
||||
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
|
||||
|
|
|
@ -39,8 +39,10 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
|
|||
import org.eclipse.jetty.server.handler.HandlerCollection;
|
||||
import org.eclipse.jetty.server.handler.RequestLogHandler;
|
||||
import org.eclipse.jetty.server.handler.StatisticsHandler;
|
||||
import org.eclipse.jetty.servlet.FilterHolder;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.servlets.CrossOriginFilter;
|
||||
import org.glassfish.jersey.server.ResourceConfig;
|
||||
import org.glassfish.jersey.servlet.ServletContainer;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -52,9 +54,11 @@ import java.io.OutputStream;
|
|||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.servlet.DispatcherType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.UriBuilder;
|
||||
|
||||
|
@ -109,6 +113,14 @@ public class RestServer {
|
|||
context.setContextPath("/");
|
||||
context.addServlet(servletHolder, "/*");
|
||||
|
||||
String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
|
||||
if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
|
||||
FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
|
||||
filterHolder.setName("cross-origin");
|
||||
filterHolder.setInitParameter("allowedOrigins", allowedOrigins);
|
||||
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
|
||||
}
|
||||
|
||||
RequestLogHandler requestLogHandler = new RequestLogHandler();
|
||||
Slf4jRequestLog requestLog = new Slf4jRequestLog();
|
||||
requestLog.setLoggerName(RestServer.class.getCanonicalName());
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.connect.runtime.rest;
|
||||
|
||||
import org.apache.kafka.connect.runtime.Herder;
|
||||
import org.apache.kafka.connect.runtime.WorkerConfig;
|
||||
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
|
||||
import org.apache.kafka.connect.util.Callback;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.IAnswer;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.api.easymock.annotation.MockStrict;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
import javax.ws.rs.client.Invocation;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
public class RestServerTest {
|
||||
|
||||
@MockStrict
|
||||
private Herder herder;
|
||||
private RestServer server;
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
private Map<String, String> baseWorkerProps() {
|
||||
Map<String, String> workerProps = new HashMap<>();
|
||||
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
|
||||
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
|
||||
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
|
||||
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
|
||||
workerProps.put("internal.key.converter.schemas.enable", "false");
|
||||
workerProps.put("internal.value.converter.schemas.enable", "false");
|
||||
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
|
||||
return workerProps;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCORSEnabled() {
|
||||
checkCORSRequest("*", "http://bar.com", "http://bar.com");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCORSDisabled() {
|
||||
checkCORSRequest("", "http://bar.com", null);
|
||||
}
|
||||
|
||||
public void checkCORSRequest(String corsDomain, String origin, String expectedHeader) {
|
||||
// To be able to set the Origin, we need to toggle this flag
|
||||
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
|
||||
|
||||
final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
|
||||
herder.connectors(EasyMock.capture(connectorsCallback));
|
||||
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
|
||||
@Override
|
||||
public Object answer() throws Throwable {
|
||||
connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
PowerMock.replayAll();
|
||||
|
||||
Map<String, String> workerProps = baseWorkerProps();
|
||||
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
|
||||
WorkerConfig workerConfig = new StandaloneConfig(workerProps);
|
||||
server = new RestServer(workerConfig);
|
||||
server.start(herder);
|
||||
|
||||
Response response = request("/connectors")
|
||||
.header("Referer", origin + "/page")
|
||||
.header("Origin", origin)
|
||||
.get();
|
||||
assertEquals(200, response.getStatus());
|
||||
|
||||
assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin"));
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
protected Invocation.Builder request(String path) {
|
||||
return request(path, null, null, null);
|
||||
}
|
||||
|
||||
protected Invocation.Builder request(String path, Map<String, String> queryParams) {
|
||||
return request(path, null, null, queryParams);
|
||||
}
|
||||
|
||||
protected Invocation.Builder request(String path, String templateName, Object templateValue) {
|
||||
return request(path, templateName, templateValue, null);
|
||||
}
|
||||
|
||||
protected Invocation.Builder request(String path, String templateName, Object templateValue,
|
||||
Map<String, String> queryParams) {
|
||||
Client client = ClientBuilder.newClient();
|
||||
WebTarget target;
|
||||
URI pathUri = null;
|
||||
try {
|
||||
pathUri = new URI(path);
|
||||
} catch (URISyntaxException e) {
|
||||
// Ignore, use restConnect and assume this is a valid path part
|
||||
}
|
||||
if (pathUri != null && pathUri.isAbsolute()) {
|
||||
target = client.target(path);
|
||||
} else {
|
||||
target = client.target(server.advertisedUrl()).path(path);
|
||||
}
|
||||
if (templateName != null && templateValue != null) {
|
||||
target = target.resolveTemplate(templateName, templateValue);
|
||||
}
|
||||
if (queryParams != null) {
|
||||
for (Map.Entry<String, String> queryParam : queryParams.entrySet()) {
|
||||
target = target.queryParam(queryParam.getKey(), queryParam.getValue());
|
||||
}
|
||||
}
|
||||
return target.request();
|
||||
}
|
||||
}
|
|
@ -72,6 +72,7 @@ libs += [
|
|||
jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
|
||||
jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty",
|
||||
jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty",
|
||||
jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
|
||||
jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
|
||||
junit: "junit:junit:$versions.junit",
|
||||
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
|
||||
|
|
Loading…
Reference in New Issue