KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149)

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2024-01-10 09:03:23 -05:00 committed by GitHub
parent 5a0a4c5a54
commit dbf00bcf45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 242 additions and 188 deletions

View File

@ -516,6 +516,9 @@
<allow pkg="kafka.server" /> <allow pkg="kafka.server" />
<subpackage name="rest"> <subpackage name="rest">
<allow pkg="javax.ws.rs" /> <allow pkg="javax.ws.rs" />
<allow pkg="javax.inject" />
<allow pkg="org.glassfish.jersey" />
<allow pkg="org.glassfish.hk2" />
</subpackage> </subpackage>
</subpackage> </subpackage>
@ -530,6 +533,8 @@
<subpackage name="rest"> <subpackage name="rest">
<allow pkg="org.eclipse.jetty" /> <allow pkg="org.eclipse.jetty" />
<allow pkg="javax.ws.rs" /> <allow pkg="javax.ws.rs" />
<allow pkg="javax.inject" />
<allow pkg="org.glassfish.hk2" />
<allow pkg="javax.servlet" /> <allow pkg="javax.servlet" />
<allow pkg="org.glassfish.jersey" /> <allow pkg="org.glassfish.jersey" />
<allow pkg="com.fasterxml.jackson" /> <allow pkg="com.fasterxml.jackson" />

View File

@ -22,7 +22,9 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; import org.glassfish.hk2.api.TypeLiteral;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -45,15 +47,28 @@ public class MirrorRestServer extends RestServer {
} }
@Override @Override
protected Collection<ConnectResource> regularResources() { protected Collection<Class<?>> regularResources() {
return Arrays.asList( return Arrays.asList(
new InternalMirrorResource(herders, restClient) InternalMirrorResource.class
); );
} }
@Override @Override
protected Collection<ConnectResource> adminResources() { protected Collection<Class<?>> adminResources() {
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
protected void configureRegularResources(ResourceConfig resourceConfig) {
resourceConfig.register(new Binder());
}
private class Binder extends AbstractBinder {
@Override
protected void configure() {
bind(herders).to(new TypeLiteral<Map<SourceAndTarget, Herder>>() { });
bind(restClient).to(RestClient.class);
}
}
} }

View File

@ -19,10 +19,12 @@ package org.apache.kafka.connect.mirror.rest.resources;
import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.ws.rs.NotFoundException; import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
@ -39,8 +41,13 @@ public class InternalMirrorResource extends InternalClusterResource {
private final Map<SourceAndTarget, Herder> herders; private final Map<SourceAndTarget, Herder> herders;
public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) { @Inject
super(restClient); public InternalMirrorResource(
Map<SourceAndTarget, Herder> herders,
RestClient restClient,
RestRequestTimeout requestTimeout
) {
super(restClient, requestTimeout);
this.herders = herders; this.herders = herders;
} }

View File

@ -66,10 +66,10 @@ import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.Message; import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.sink.SinkTask;
@ -718,7 +718,7 @@ public class Worker {
.map(this::taskTransactionalId) .map(this::taskTransactionalId)
.collect(Collectors.toList()); .collect(Collectors.toList());
FenceProducersOptions fencingOptions = new FenceProducersOptions() FenceProducersOptions fencingOptions = new FenceProducersOptions()
.timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); .timeoutMs((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
return admin.fenceProducers(transactionalIds, fencingOptions).all().whenComplete((ignored, error) -> { return admin.fenceProducers(transactionalIds, fencingOptions).all().whenComplete((ignored, error) -> {
if (error == null) if (error == null)
log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName); log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName);
@ -1195,7 +1195,7 @@ public class Worker {
Admin admin = adminFactory.apply(adminConfig); Admin admin = adminFactory.apply(adminConfig);
try { try {
ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions() ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions()
.timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); .timeoutMs((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions); ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result, error) -> { listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result, error) -> {
if (error != null) { if (error != null) {
@ -1299,7 +1299,7 @@ public class Worker {
Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> { executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try { try {
Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS)); Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
boolean isReset = offsets == null; boolean isReset = offsets == null;
SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
Class<? extends Connector> sinkConnectorClass = connector.getClass(); Class<? extends Connector> sinkConnectorClass = connector.getClass();
@ -1530,7 +1530,7 @@ public class Worker {
ClassLoader connectorLoader, Callback<Message> cb) { ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> { executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try { try {
Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS)); Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
// This reads to the end of the offsets topic and can be a potentially time-consuming operation // This reads to the end of the offsets topic and can be a potentially time-consuming operation
offsetStore.start(); offsetStore.start();
updateTimerAndCheckExpiry(timer, "Timed out while trying to read to the end of the offsets topic prior to modifying " + updateTimerAndCheckExpiry(timer, "Timed out while trying to read to the end of the offsets topic prior to modifying " +

View File

@ -17,12 +17,12 @@
package org.apache.kafka.connect.runtime.rest; package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource; import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource;
import org.apache.kafka.connect.runtime.rest.resources.LoggingResource; import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
import org.apache.kafka.connect.runtime.rest.resources.RootResource; import org.apache.kafka.connect.runtime.rest.resources.RootResource;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.server.ResourceConfig;
import java.util.Arrays; import java.util.Arrays;
@ -45,25 +45,40 @@ public class ConnectRestServer extends RestServer {
} }
@Override @Override
protected Collection<ConnectResource> regularResources() { protected Collection<Class<?>> regularResources() {
return Arrays.asList( return Arrays.asList(
new RootResource(herder), RootResource.class,
new ConnectorsResource(herder, config, restClient), ConnectorsResource.class,
new InternalConnectResource(herder, restClient), InternalConnectResource.class,
new ConnectorPluginsResource(herder) ConnectorPluginsResource.class
); );
} }
@Override @Override
protected Collection<ConnectResource> adminResources() { protected Collection<Class<?>> adminResources() {
return Arrays.asList( return Arrays.asList(
new LoggingResource(herder) LoggingResource.class
); );
} }
@Override @Override
protected void configureRegularResources(ResourceConfig resourceConfig) { protected void configureRegularResources(ResourceConfig resourceConfig) {
registerRestExtensions(herder, resourceConfig); registerRestExtensions(herder, resourceConfig);
resourceConfig.register(new Binder());
}
private class Binder extends AbstractBinder {
@Override
protected void configure() {
bind(herder).to(Herder.class);
bind(restClient).to(RestClient.class);
bind(config).to(RestServerConfig.class);
}
}
@Override
protected void configureAdminResources(ResourceConfig resourceConfig) {
resourceConfig.register(new Binder());
} }
} }

View File

@ -41,18 +41,11 @@ public class HerderRequestHandler {
private final RestClient restClient; private final RestClient restClient;
private volatile long requestTimeoutMs; private final RestRequestTimeout requestTimeout;
public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) { public HerderRequestHandler(RestClient restClient, RestRequestTimeout requestTimeout) {
this.restClient = restClient; this.restClient = restClient;
this.requestTimeoutMs = requestTimeoutMs; this.requestTimeout = requestTimeout;
}
public void requestTimeoutMs(long requestTimeoutMs) {
if (requestTimeoutMs < 1) {
throw new IllegalArgumentException("REST request timeout must be positive");
}
this.requestTimeoutMs = requestTimeoutMs;
} }
/** /**
@ -64,7 +57,7 @@ public class HerderRequestHandler {
*/ */
public <T> T completeRequest(FutureCallback<T> cb) throws Throwable { public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
try { try {
return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); return cb.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw e.getCause(); throw e.getCause();
} catch (StagedTimeoutException e) { } catch (StagedTimeoutException e) {

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest;
public interface RestRequestTimeout {
/**
* @return the current timeout that should be used for REST requests, in milliseconds
*/
long timeoutMs();
}

View File

@ -27,7 +27,6 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl; import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils; import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog; import org.eclipse.jetty.server.CustomRequestLog;
@ -43,6 +42,8 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.CrossOriginFilter; import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.eclipse.jetty.servlets.HeaderFilter; import org.eclipse.jetty.servlets.HeaderFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.hk2.utilities.Binder;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties; import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.servlet.ServletContainer; import org.glassfish.jersey.servlet.ServletContainer;
@ -59,6 +60,7 @@ import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -66,6 +68,13 @@ import java.util.regex.Pattern;
* Embedded server for the REST API that provides the control plane for Kafka Connect workers. * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
*/ */
public abstract class RestServer { public abstract class RestServer {
// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
// but currently a worker simply leaving the group can take this long as well.
public static final long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90);
private static final Logger log = LoggerFactory.getLogger(RestServer.class); private static final Logger log = LoggerFactory.getLogger(RestServer.class);
// Used to distinguish between Admin connectors and regular REST API connectors when binding admin handlers // Used to distinguish between Admin connectors and regular REST API connectors when binding admin handlers
@ -80,8 +89,8 @@ public abstract class RestServer {
protected final RestServerConfig config; protected final RestServerConfig config;
private final ContextHandlerCollection handlers; private final ContextHandlerCollection handlers;
private final Server jettyServer; private final Server jettyServer;
private final RequestTimeout requestTimeout;
private Collection<ConnectResource> resources;
private List<ConnectRestExtension> connectRestExtensions = Collections.emptyList(); private List<ConnectRestExtension> connectRestExtensions = Collections.emptyList();
/** /**
@ -95,6 +104,7 @@ public abstract class RestServer {
jettyServer = new Server(); jettyServer = new Server();
handlers = new ContextHandlerCollection(); handlers = new ContextHandlerCollection();
requestTimeout = new RequestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
createConnectors(listeners, adminListeners); createConnectors(listeners, adminListeners);
} }
@ -207,44 +217,31 @@ public abstract class RestServer {
protected final void initializeResources() { protected final void initializeResources() {
log.info("Initializing REST resources"); log.info("Initializing REST resources");
resources = new ArrayList<>();
ResourceConfig resourceConfig = newResourceConfig(); ResourceConfig resourceConfig = newResourceConfig();
resourceConfig.register(new JacksonJsonProvider()); Collection<Class<?>> regularResources = regularResources();
Collection<ConnectResource> regularResources = regularResources();
regularResources.forEach(resourceConfig::register); regularResources.forEach(resourceConfig::register);
resources.addAll(regularResources);
resourceConfig.register(ConnectExceptionMapper.class);
resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
configureRegularResources(resourceConfig); configureRegularResources(resourceConfig);
List<String> adminListeners = config.adminListeners(); List<String> adminListeners = config.adminListeners();
ResourceConfig adminResourceConfig; ResourceConfig adminResourceConfig;
if (adminListeners != null && adminListeners.isEmpty()) {
log.info("Skipping adding admin resources");
// set up adminResource but add no handlers to it
adminResourceConfig = resourceConfig;
} else {
if (adminListeners == null) { if (adminListeners == null) {
log.info("Adding admin resources to main listener"); log.info("Adding admin resources to main listener");
adminResourceConfig = resourceConfig; adminResourceConfig = resourceConfig;
Collection<ConnectResource> adminResources = adminResources(); } else {
resources.addAll(adminResources);
adminResources.forEach(adminResourceConfig::register);
configureAdminResources(adminResourceConfig);
} else if (adminListeners.size() > 0) {
// TODO: we need to check if these listeners are same as 'listeners' // TODO: we need to check if these listeners are same as 'listeners'
// TODO: the following code assumes that they are different // TODO: the following code assumes that they are different
log.info("Adding admin resources to admin listener"); log.info("Adding admin resources to admin listener");
adminResourceConfig = newResourceConfig(); adminResourceConfig = newResourceConfig();
adminResourceConfig.register(new JacksonJsonProvider()); }
Collection<ConnectResource> adminResources = adminResources(); Collection<Class<?>> adminResources = adminResources();
resources.addAll(adminResources);
adminResources.forEach(adminResourceConfig::register); adminResources.forEach(adminResourceConfig::register);
adminResourceConfig.register(ConnectExceptionMapper.class);
configureAdminResources(adminResourceConfig); configureAdminResources(adminResourceConfig);
} else {
log.info("Skipping adding admin resources");
// set up adminResource but add no handlers to it
adminResourceConfig = resourceConfig;
} }
ServletContainer servletContainer = new ServletContainer(resourceConfig); ServletContainer servletContainer = new ServletContainer(resourceConfig);
@ -302,17 +299,26 @@ public abstract class RestServer {
log.info("REST resources initialized; server is started and ready to handle requests"); log.info("REST resources initialized; server is started and ready to handle requests");
} }
/** private ResourceConfig newResourceConfig() {
* @return the {@link ConnectResource resources} that should be registered with the ResourceConfig result = new ResourceConfig();
* standard (i.e., non-admin) listener for this server; may be empty, but not null result.register(new JacksonJsonProvider());
*/ result.register(requestTimeout.binder());
protected abstract Collection<ConnectResource> regularResources(); result.register(ConnectExceptionMapper.class);
result.property(ServerProperties.WADL_FEATURE_DISABLE, true);
return result;
}
/** /**
* @return the {@link ConnectResource resources} that should be registered with the * @return the resources that should be registered with the
* standard (i.e., non-admin) listener for this server; may be empty, but not null
*/
protected abstract Collection<Class<?>> regularResources();
/**
* @return the resources that should be registered with the
* admin listener for this server; may be empty, but not null * admin listener for this server; may be empty, but not null
*/ */
protected abstract Collection<ConnectResource> adminResources(); protected abstract Collection<Class<?>> adminResources();
/** /**
* Pluggable hook to customize the regular (i.e., non-admin) resources on this server * Pluggable hook to customize the regular (i.e., non-admin) resources on this server
@ -438,7 +444,7 @@ public abstract class RestServer {
// For testing only // For testing only
public void requestTimeout(long requestTimeoutMs) { public void requestTimeout(long requestTimeoutMs) {
this.resources.forEach(resource -> resource.requestTimeout(requestTimeoutMs)); this.requestTimeout.timeoutMs(requestTimeoutMs);
} }
String determineAdvertisedProtocol() { String determineAdvertisedProtocol() {
@ -488,7 +494,7 @@ public abstract class RestServer {
config.restExtensions(), config.restExtensions(),
config, ConnectRestExtension.class); config, ConnectRestExtension.class);
long herderRequestTimeoutMs = ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS; long herderRequestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
Integer rebalanceTimeoutMs = config.rebalanceTimeoutMs(); Integer rebalanceTimeoutMs = config.rebalanceTimeoutMs();
@ -520,4 +526,35 @@ public abstract class RestServer {
headerFilterHolder.setInitParameter("headerConfig", headerConfig); headerFilterHolder.setInitParameter("headerConfig", headerConfig);
context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
} }
private static class RequestTimeout implements RestRequestTimeout {
private final RequestBinder binder;
private volatile long timeoutMs;
public RequestTimeout(long initialTimeoutMs) {
this.timeoutMs = initialTimeoutMs;
this.binder = new RequestBinder();
}
@Override
public long timeoutMs() {
return timeoutMs;
}
public void timeoutMs(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
public Binder binder() {
return binder;
}
private class RequestBinder extends AbstractBinder {
@Override
protected void configure() {
bind(RequestTimeout.this).to(RestRequestTimeout.class);
}
}
}
} }

View File

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest.resources;
import java.util.concurrent.TimeUnit;
/**
* This interface defines shared logic for all Connect REST resources.
*/
public interface ConnectResource {
// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
// but currently a worker simply leaving the group can take this long as well.
long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90);
/**
* Set how long the resource will await the completion of each request before returning a 500 error.
* If the resource does not perform any operations that can be expected to block under reasonable
* circumstances, this can be implemented as a no-op.
* @param requestTimeoutMs the new timeout in milliseconds; must be positive
*/
void requestTimeout(long requestTimeoutMs);
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.PluginInfo; import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
@ -30,6 +31,7 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.Stage; import org.apache.kafka.connect.util.Stage;
import org.apache.kafka.connect.util.StagedTimeoutException; import org.apache.kafka.connect.util.StagedTimeoutException;
import javax.inject.Inject;
import javax.ws.rs.BadRequestException; import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue; import javax.ws.rs.DefaultValue;
@ -56,17 +58,18 @@ import java.util.stream.Collectors;
@Path("/connector-plugins") @Path("/connector-plugins")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
public class ConnectorPluginsResource implements ConnectResource { public class ConnectorPluginsResource {
private static final String ALIAS_SUFFIX = "Connector"; private static final String ALIAS_SUFFIX = "Connector";
private final Herder herder; private final Herder herder;
private final Set<PluginInfo> connectorPlugins; private final Set<PluginInfo> connectorPlugins;
private long requestTimeoutMs; private final RestRequestTimeout requestTimeout;
public ConnectorPluginsResource(Herder herder) { @Inject
public ConnectorPluginsResource(Herder herder, RestRequestTimeout requestTimeout) {
this.herder = herder; this.herder = herder;
this.requestTimeout = requestTimeout;
this.connectorPlugins = new LinkedHashSet<>(); this.connectorPlugins = new LinkedHashSet<>();
this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
// TODO: improve once plugins are allowed to be added/removed during runtime. // TODO: improve once plugins are allowed to be added/removed during runtime.
addConnectorPlugins(herder.plugins().sinkConnectors()); addConnectorPlugins(herder.plugins().sinkConnectors());
@ -83,11 +86,6 @@ public class ConnectorPluginsResource implements ConnectResource {
.forEach(connectorPlugins::add); .forEach(connectorPlugins::add);
} }
@Override
public void requestTimeout(long requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
}
@PUT @PUT
@Path("/{pluginName}/config/validate") @Path("/{pluginName}/config/validate")
@Operation(summary = "Validate the provided configuration against the configuration definition for the specified pluginName") @Operation(summary = "Validate the provided configuration against the configuration definition for the specified pluginName")
@ -109,7 +107,7 @@ public class ConnectorPluginsResource implements ConnectResource {
herder.validateConnectorConfig(connectorConfig, validationCallback, false); herder.validateConnectorConfig(connectorConfig, validationCallback, false);
try { try {
return validationCallback.get(requestTimeoutMs, TimeUnit.MILLISECONDS); return validationCallback.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS);
} catch (StagedTimeoutException e) { } catch (StagedTimeoutException e) {
Stage stage = e.stage(); Stage stage = e.stage();
String message; String message;
@ -136,7 +134,6 @@ public class ConnectorPluginsResource implements ConnectResource {
} }
@GET @GET
@Path("/")
@Operation(summary = "List all connector plugins installed") @Operation(summary = "List all connector plugins installed")
public List<PluginInfo> listConnectorPlugins( public List<PluginInfo> listConnectorPlugins(
@DefaultValue("true") @QueryParam("connectorsOnly") @Parameter(description = "Whether to list only connectors instead of all plugins") boolean connectorsOnly @DefaultValue("true") @QueryParam("connectorsOnly") @Parameter(description = "Whether to list only connectors instead of all plugins") boolean connectorsOnly

View File

@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.rest.HerderRequestHandler; import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@ -39,6 +40,7 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.ws.rs.BadRequestException; import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
@ -70,7 +72,7 @@ import static org.apache.kafka.connect.runtime.rest.HerderRequestHandler.Transla
@Path("/connectors") @Path("/connectors")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
public class ConnectorsResource implements ConnectResource { public class ConnectorsResource {
private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class); private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
private final Herder herder; private final Herder herder;
@ -80,20 +82,20 @@ public class ConnectorsResource implements ConnectResource {
private final boolean isTopicTrackingDisabled; private final boolean isTopicTrackingDisabled;
private final boolean isTopicTrackingResetDisabled; private final boolean isTopicTrackingResetDisabled;
public ConnectorsResource(Herder herder, RestServerConfig config, RestClient restClient) { @Inject
public ConnectorsResource(
Herder herder,
RestServerConfig config,
RestClient restClient,
RestRequestTimeout requestTimeout
) {
this.herder = herder; this.herder = herder;
this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS); this.requestHandler = new HerderRequestHandler(restClient, requestTimeout);
this.isTopicTrackingDisabled = !config.topicTrackingEnabled(); this.isTopicTrackingDisabled = !config.topicTrackingEnabled();
this.isTopicTrackingResetDisabled = !config.topicTrackingResetEnabled(); this.isTopicTrackingResetDisabled = !config.topicTrackingResetEnabled();
} }
@Override
public void requestTimeout(long requestTimeoutMs) {
requestHandler.requestTimeoutMs(requestTimeoutMs);
}
@GET @GET
@Path("/")
@Operation(summary = "List all active connectors") @Operation(summary = "List all active connectors")
public Response listConnectors( public Response listConnectors(
final @Context UriInfo uriInfo, final @Context UriInfo uriInfo,
@ -131,7 +133,6 @@ public class ConnectorsResource implements ConnectResource {
} }
@POST @POST
@Path("/")
@Operation(summary = "Create a new connector") @Operation(summary = "Create a new connector")
public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers, final @Context HttpHeaders headers,

View File

@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.rest.HerderRequestHandler; import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.FutureCallback;
import javax.ws.rs.POST; import javax.ws.rs.POST;
@ -45,7 +46,7 @@ import java.util.Map;
* requests that originate from a user and are forwarded from one worker to another. * requests that originate from a user and are forwarded from one worker to another.
*/ */
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public abstract class InternalClusterResource implements ConnectResource { public abstract class InternalClusterResource {
private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE = private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE =
new TypeReference<List<Map<String, String>>>() { }; new TypeReference<List<Map<String, String>>>() { };
@ -56,13 +57,8 @@ public abstract class InternalClusterResource implements ConnectResource {
@Context @Context
UriInfo uriInfo; UriInfo uriInfo;
protected InternalClusterResource(RestClient restClient) { protected InternalClusterResource(RestClient restClient, RestRequestTimeout requestTimeout) {
this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS); this.requestHandler = new HerderRequestHandler(restClient, requestTimeout);
}
@Override
public void requestTimeout(long requestTimeoutMs) {
requestHandler.requestTimeoutMs(requestTimeoutMs);
} }
/** /**

View File

@ -18,7 +18,9 @@ package org.apache.kafka.connect.runtime.rest.resources;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import javax.inject.Inject;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@Path("/connectors") @Path("/connectors")
@ -26,8 +28,9 @@ public class InternalConnectResource extends InternalClusterResource {
private final Herder herder; private final Herder herder;
public InternalConnectResource(Herder herder, RestClient restClient) { @Inject
super(restClient); public InternalConnectResource(Herder herder, RestClient restClient, RestRequestTimeout requestTimeout) {
super(restClient, requestTimeout);
this.herder = herder; this.herder = herder;
} }

View File

@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue; import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET; import javax.ws.rs.GET;
@ -46,7 +47,7 @@ import java.util.Objects;
@Path("/admin/loggers") @Path("/admin/loggers")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
public class LoggingResource implements ConnectResource { public class LoggingResource {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(LoggingResource.class); private static final org.slf4j.Logger log = LoggerFactory.getLogger(LoggingResource.class);
@ -55,22 +56,17 @@ public class LoggingResource implements ConnectResource {
private final Herder herder; private final Herder herder;
@Inject
public LoggingResource(Herder herder) { public LoggingResource(Herder herder) {
this.herder = herder; this.herder = herder;
} }
@Override
public void requestTimeout(long requestTimeoutMs) {
// No-op
}
/** /**
* List the current loggers that have their levels explicitly set and their log levels. * List the current loggers that have their levels explicitly set and their log levels.
* *
* @return a list of current loggers and their levels. * @return a list of current loggers and their levels.
*/ */
@GET @GET
@Path("/")
@Operation(summary = "List the current loggers that have their levels explicitly set and their log levels") @Operation(summary = "List the current loggers that have their levels explicitly set and their log levels")
public Response listLoggers() { public Response listLoggers() {
return Response.ok(herder.allLoggerLevels()).build(); return Response.ok(herder.allLoggerLevels()).build();

View File

@ -20,6 +20,7 @@ import io.swagger.v3.oas.annotations.Operation;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import javax.inject.Inject;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
@ -27,21 +28,16 @@ import javax.ws.rs.core.MediaType;
@Path("/") @Path("/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public class RootResource implements ConnectResource { public class RootResource {
private final Herder herder; private final Herder herder;
@Inject
public RootResource(Herder herder) { public RootResource(Herder herder) {
this.herder = herder; this.herder = herder;
} }
@Override
public void requestTimeout(long requestTimeoutMs) {
// No-op
}
@GET @GET
@Path("/")
@Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to") @Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to")
public ServerInfo serverInfo() { public ServerInfo serverInfo() {
return new ServerInfo(herder.kafkaClusterId()); return new ServerInfo(herder.kafkaClusterId());

View File

@ -27,7 +27,6 @@ import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.sink.SinkTask;
@ -63,6 +62,7 @@ import java.util.stream.IntStream;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -379,7 +379,7 @@ public class BlockingConnectorTest {
); );
} }
// Reset the REST request timeout so that other requests aren't impacted // Reset the REST request timeout so that other requests aren't impacted
connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
} }
private static class Block { private static class Block {

View File

@ -63,7 +63,7 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POL
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.resources.ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS; import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;

View File

@ -61,10 +61,10 @@ import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.Message; import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
@ -2417,7 +2417,7 @@ public class WorkerTest {
// Expect the call to Admin::deleteConsumerGroups to have a timeout value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS // Expect the call to Admin::deleteConsumerGroups to have a timeout value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS
// minus the delay introduced in the call to Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call to // minus the delay introduced in the call to Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call to
// SinkConnector::alterOffsets (3000 ms) // SinkConnector::alterOffsets (3000 ms)
assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L - 3000L, assertEquals((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L - 3000L,
deleteConsumerGroupsOptionsArgumentCaptor.getValue().timeoutMs().intValue()); deleteConsumerGroupsOptionsArgumentCaptor.getValue().timeoutMs().intValue());
verify(admin, timeout(1000)).close(); verify(admin, timeout(1000)).close();
verifyKafkaClusterId(); verifyKafkaClusterId();
@ -2469,7 +2469,7 @@ public class WorkerTest {
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> { when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> {
time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
return true; return true;
}); });
ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class);
@ -2507,7 +2507,7 @@ public class WorkerTest {
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sinkConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> { when(sinkConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> {
time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
return true; return true;
}); });

View File

@ -40,6 +40,7 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -62,13 +63,13 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.StrictStubs.class) @RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ConnectRestServerTest { public class ConnectRestServerTest {
private Herder herder; @Mock private RestClient restClient;
private Plugins plugins; @Mock private Herder herder;
@Mock private Plugins plugins;
private ConnectRestServer server; private ConnectRestServer server;
private CloseableHttpClient httpClient; private CloseableHttpClient httpClient;
private Collection<CloseableHttpResponse> responses = new ArrayList<>(); private Collection<CloseableHttpResponse> responses = new ArrayList<>();
@ -77,8 +78,6 @@ public class ConnectRestServerTest {
@Before @Before
public void setUp() { public void setUp() {
herder = mock(Herder.class);
plugins = mock(Plugins.class);
httpClient = HttpClients.createMinimal(); httpClient = HttpClients.createMinimal();
} }
@ -117,7 +116,7 @@ public class ConnectRestServerTest {
Map<String, String> configMap = new HashMap<>(baseServerProps()); Map<String, String> configMap = new HashMap<>(baseServerProps());
configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443"); configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("http://localhost:8080/", server.advertisedUrl().toString()); Assert.assertEquals("http://localhost:8080/", server.advertisedUrl().toString());
server.stop(); server.stop();
@ -126,7 +125,7 @@ public class ConnectRestServerTest {
configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443"); configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https"); configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString()); Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
server.stop(); server.stop();
@ -134,7 +133,7 @@ public class ConnectRestServerTest {
configMap = new HashMap<>(baseServerProps()); configMap = new HashMap<>(baseServerProps());
configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443"); configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443");
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString()); Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
server.stop(); server.stop();
@ -145,7 +144,7 @@ public class ConnectRestServerTest {
configMap.put(RestServerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost"); configMap.put(RestServerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost");
configMap.put(RestServerConfig.REST_ADVERTISED_PORT_CONFIG, "10000"); configMap.put(RestServerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString()); Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString());
server.stop(); server.stop();
@ -154,7 +153,7 @@ public class ConnectRestServerTest {
configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761"); configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761");
configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http"); configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("http://plaintext-localhost:4761/", server.advertisedUrl().toString()); Assert.assertEquals("http://plaintext-localhost:4761/", server.advertisedUrl().toString());
server.stop(); server.stop();
} }
@ -167,7 +166,7 @@ public class ConnectRestServerTest {
doReturn(plugins).when(herder).plugins(); doReturn(plugins).when(herder).plugins();
expectEmptyRestExtensions(); expectEmptyRestExtensions();
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
@ -194,7 +193,7 @@ public class ConnectRestServerTest {
expectEmptyRestExtensions(); expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors(); doReturn(Arrays.asList("a", "b")).when(herder).connectors();
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
URI serverUrl = server.advertisedUrl(); URI serverUrl = server.advertisedUrl();
@ -237,7 +236,7 @@ public class ConnectRestServerTest {
expectEmptyRestExtensions(); expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors(); doReturn(Arrays.asList("a", "b")).when(herder).connectors();
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors"); HttpRequest request = new HttpGet("/connectors");
@ -260,7 +259,7 @@ public class ConnectRestServerTest {
doReturn(Collections.emptyList()).when(herder).setWorkerLoggerLevel(logger, loggingLevel); doReturn(Collections.emptyList()).when(herder).setWorkerLoggerLevel(logger, loggingLevel);
doReturn(Collections.singletonMap(logger, new LoggerLevel(loggingLevel, lastModified))).when(herder).allLoggerLevels(); doReturn(Collections.singletonMap(logger, new LoggerLevel(loggingLevel, lastModified))).when(herder).allLoggerLevels();
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
@ -295,7 +294,7 @@ public class ConnectRestServerTest {
LoggerFactory.getLogger("a.b.c.p.Y"); LoggerFactory.getLogger("a.b.c.p.Y");
LoggerFactory.getLogger("a.b.c.p.Z"); LoggerFactory.getLogger("a.b.c.p.Z");
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
@ -317,7 +316,7 @@ public class ConnectRestServerTest {
doReturn(plugins).when(herder).plugins(); doReturn(plugins).when(herder).plugins();
expectEmptyRestExtensions(); expectEmptyRestExtensions();
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
@ -336,7 +335,7 @@ public class ConnectRestServerTest {
doReturn(plugins).when(herder).plugins(); doReturn(plugins).when(herder).plugins();
expectEmptyRestExtensions(); expectEmptyRestExtensions();
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
@ -382,7 +381,7 @@ public class ConnectRestServerTest {
expectEmptyRestExtensions(); expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors(); doReturn(Arrays.asList("a", "b")).when(herder).connectors();
server = new ConnectRestServer(null, null, configMap); server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer(); server.initializeServer();
server.initializeResources(herder); server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors"); HttpRequest request = new HttpGet("/connectors");

View File

@ -39,6 +39,8 @@ import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@ -208,7 +210,8 @@ public class ConnectorPluginsResourceTest {
doReturn(HEADER_CONVERTER_PLUGINS).when(plugins).headerConverters(); doReturn(HEADER_CONVERTER_PLUGINS).when(plugins).headerConverters();
doReturn(TRANSFORMATION_PLUGINS).when(plugins).transformations(); doReturn(TRANSFORMATION_PLUGINS).when(plugins).transformations();
doReturn(PREDICATE_PLUGINS).when(plugins).predicates(); doReturn(PREDICATE_PLUGINS).when(plugins).predicates();
connectorPluginsResource = new ConnectorPluginsResource(herder); RestRequestTimeout requestTimeout = () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
connectorPluginsResource = new ConnectorPluginsResource(herder, requestTimeout);
} }
@Test @Test

View File

@ -28,6 +28,8 @@ import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException; import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@ -146,6 +148,8 @@ public class ConnectorsResourceTest {
private static final Set<String> CONNECTOR2_ACTIVE_TOPICS = new HashSet<>( private static final Set<String> CONNECTOR2_ACTIVE_TOPICS = new HashSet<>(
Arrays.asList("foo_topic", "baz_topic")); Arrays.asList("foo_topic", "baz_topic"));
private static final RestRequestTimeout REQUEST_TIMEOUT = () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
@Mock @Mock
private Herder herder; private Herder herder;
private ConnectorsResource connectorsResource; private ConnectorsResource connectorsResource;
@ -159,7 +163,7 @@ public class ConnectorsResourceTest {
public void setUp() throws NoSuchMethodException { public void setUp() throws NoSuchMethodException {
when(serverConfig.topicTrackingEnabled()).thenReturn(true); when(serverConfig.topicTrackingEnabled()).thenReturn(true);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(true); when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT);
forward = mock(UriInfo.class); forward = mock(UriInfo.class);
MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
queryParams.putSingle("forward", "true"); queryParams.putSingle("forward", "true");
@ -742,7 +746,7 @@ public class ConnectorsResourceTest {
public void testConnectorActiveTopicsWithTopicTrackingDisabled() { public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
when(serverConfig.topicTrackingEnabled()).thenReturn(false); when(serverConfig.topicTrackingEnabled()).thenReturn(false);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(false); when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT);
Exception e = assertThrows(ConnectRestException.class, Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME)); () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
@ -754,7 +758,7 @@ public class ConnectorsResourceTest {
when(serverConfig.topicTrackingEnabled()).thenReturn(false); when(serverConfig.topicTrackingEnabled()).thenReturn(false);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(true); when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
HttpHeaders headers = mock(HttpHeaders.class); HttpHeaders headers = mock(HttpHeaders.class);
connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT);
Exception e = assertThrows(ConnectRestException.class, Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@ -766,7 +770,7 @@ public class ConnectorsResourceTest {
when(serverConfig.topicTrackingEnabled()).thenReturn(true); when(serverConfig.topicTrackingEnabled()).thenReturn(true);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(false); when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
HttpHeaders headers = mock(HttpHeaders.class); HttpHeaders headers = mock(HttpHeaders.class);
connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT);
Exception e = assertThrows(ConnectRestException.class, Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@ -779,7 +783,7 @@ public class ConnectorsResourceTest {
when(serverConfig.topicTrackingResetEnabled()).thenReturn(true); when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
when(herder.connectorActiveTopics(CONNECTOR_NAME)) when(herder.connectorActiveTopics(CONNECTOR_NAME))
.thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS));
connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT);
Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME); Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
@ -792,7 +796,7 @@ public class ConnectorsResourceTest {
@Test @Test
public void testResetConnectorActiveTopics() { public void testResetConnectorActiveTopics() {
HttpHeaders headers = mock(HttpHeaders.class); HttpHeaders headers = mock(HttpHeaders.class);
connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT);
Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers); Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME); verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME);

View File

@ -21,6 +21,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.Callback;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -74,7 +75,7 @@ public class InternalConnectResourceTest {
@Before @Before
public void setup() { public void setup() {
internalResource = new InternalConnectResource(herder, restClient); internalResource = new InternalConnectResource(herder, restClient, () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
internalResource.uriInfo = uriInfo; internalResource.uriInfo = uriInfo;
} }