KAFKA-10816: Add health check endpoint for Kafka Connect (#16477)

Reviewers: Greg Harris <gharris1727@gmail.com>
This commit is contained in:
Chris Egerton 2024-07-03 20:15:15 +02:00 committed by GitHub
parent 4550550c7c
commit 27220d146c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 911 additions and 156 deletions

View File

@ -17,8 +17,8 @@
package org.apache.kafka.common.utils;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@ -98,7 +98,7 @@ public interface Time {
* @param <T> The type of the future.
*/
default <T> T waitForFuture(
CompletableFuture<T> future,
Future<T> future,
long deadlineNs
) throws TimeoutException, InterruptedException, ExecutionException {
TimeoutException timeoutException = null;

View File

@ -40,9 +40,10 @@ import java.util.Map;
/**
* Common initialization logic for Kafka Connect, intended for use by command line utilities
*
* @param <H> the type of {@link Herder} to be used
* @param <T> the type of {@link WorkerConfig} to be used
*/
public abstract class AbstractConnectCli<T extends WorkerConfig> {
public abstract class AbstractConnectCli<H extends Herder, T extends WorkerConfig> {
private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
private final String[] args;
@ -52,7 +53,7 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
*
* @param args the CLI arguments to be processed. Note that if one or more arguments are passed, the first argument is
* assumed to be the Connect worker properties file and is processed in {@link #run()}. The remaining arguments
* can be handled in {@link #processExtraArgs(Herder, Connect, String[])}
* can be handled in {@link #processExtraArgs(Connect, String[])}
*/
protected AbstractConnectCli(String... args) {
this.args = args;
@ -64,15 +65,14 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
* The first CLI argument is assumed to be the Connect worker properties file and is processed by default. This method
* can be overridden if there are more arguments that need to be processed.
*
* @param herder the {@link Herder} instance that can be used to perform operations on the Connect cluster
* @param connect the {@link Connect} instance that can be stopped (via {@link Connect#stop()}) if there's an error
* encountered while processing the additional CLI arguments.
* @param extraArgs the extra CLI arguments that need to be processed
*/
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
public void processExtraArgs(Connect<H> connect, String[] extraArgs) {
}
protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
protected abstract H createHerder(T config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient);
@ -92,7 +92,8 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
Connect connect = startConnect(workerProps, extraArgs);
Connect<H> connect = startConnect(workerProps);
processExtraArgs(connect, extraArgs);
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
@ -107,11 +108,9 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
* Initialize and start an instance of {@link Connect}
*
* @param workerProps the worker properties map used to initialize the {@link WorkerConfig}
* @param extraArgs any additional CLI arguments that may need to be processed via
* {@link #processExtraArgs(Herder, Connect, String[])}
* @return a started instance of {@link Connect}
*/
public Connect startConnect(Map<String, String> workerProps, String... extraArgs) {
public Connect<H> startConnect(Map<String, String> workerProps) {
log.info("Kafka Connect worker initializing ...");
long initStart = time.hiResClockMs();
@ -136,9 +135,9 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
H herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
final Connect connect = new Connect(herder, restServer);
final Connect<H> connect = new Connect<>(herder, restServer);
log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
@ -148,8 +147,6 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
Exit.exit(3);
}
processExtraArgs(herder, connect, extraArgs);
return connect;
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@ -52,7 +51,7 @@ import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
* stopping worker instances.
* </p>
*/
public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
public class ConnectDistributed extends AbstractConnectCli<DistributedHerder, DistributedConfig> {
public ConnectDistributed(String... args) {
super(args);
@ -64,7 +63,7 @@ public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
}
@Override
protected Herder createHerder(DistributedConfig config, String workerId, Plugins plugins,
protected DistributedHerder createHerder(DistributedConfig config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient) {

View File

@ -65,7 +65,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
* since it uses file storage (configurable via {@link StandaloneConfig#OFFSET_STORAGE_FILE_FILENAME_CONFIG})
* </p>
*/
public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
public class ConnectStandalone extends AbstractConnectCli<StandaloneHerder, StandaloneConfig> {
private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
public ConnectStandalone(String... args) {
@ -78,7 +78,7 @@ public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
}
@Override
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
public void processExtraArgs(Connect<StandaloneHerder> connect, String[] extraArgs) {
try {
for (final String connectorConfigFile : extraArgs) {
CreateConnectorRequest createConnectorRequest = parseConnectorConfigurationFile(connectorConfigFile);
@ -88,12 +88,13 @@ public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
else
log.info("Created connector {}", info.result().name());
});
herder.putConnectorConfig(
connect.herder().putConnectorConfig(
createConnectorRequest.name(), createConnectorRequest.config(),
createConnectorRequest.initialTargetState(),
false, cb);
cb.get();
}
connect.herder().ready();
} catch (Throwable t) {
log.error("Stopping after connector error", t);
connect.stop();
@ -160,7 +161,7 @@ public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
}
@Override
protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
protected StandaloneHerder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient) {

View File

@ -132,8 +132,8 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private final String kafkaClusterId;
protected final StatusBackingStore statusBackingStore;
protected final ConfigBackingStore configBackingStore;
private volatile boolean ready = false;
private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
protected volatile boolean running = false;
private final ExecutorService connectorExecutor;
private final Time time;
protected final Loggers loggers;
@ -180,9 +180,13 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
Utils.closeQuietly(this.connectorClientConfigOverridePolicy, "connector client config override policy");
}
protected void ready() {
this.ready = true;
}
@Override
public boolean isRunning() {
return running;
public boolean isReady() {
return ready;
}
@Override

View File

@ -30,23 +30,27 @@ import java.util.concurrent.atomic.AtomicBoolean;
* This class ties together all the components of a Kafka Connect process (herder, worker,
* storage, command interface), managing their lifecycle.
*/
public class Connect {
public class Connect<H extends Herder> {
private static final Logger log = LoggerFactory.getLogger(Connect.class);
private final Herder herder;
private final H herder;
private final ConnectRestServer rest;
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook;
public Connect(Herder herder, ConnectRestServer rest) {
public Connect(H herder, ConnectRestServer rest) {
log.debug("Kafka Connect instance created");
this.herder = herder;
this.rest = rest;
shutdownHook = new ShutdownHook();
}
public H herder() {
return herder;
}
public void start() {
try {
log.info("Kafka Connect starting");
@ -85,10 +89,6 @@ public class Connect {
}
}
public boolean isRunning() {
return herder.isRunning();
}
// Visible for testing
public RestServer rest() {
return rest;

View File

@ -64,7 +64,18 @@ public interface Herder {
void stop();
boolean isRunning();
/**
* @return whether the worker is ready; i.e., it has completed all initialization and startup
* steps such as creating internal topics, joining a cluster, etc.
*/
boolean isReady();
/**
* Check for worker health; i.e., its ability to service external requests from the user such
* as creating, reconfiguring, and deleting connectors
* @param callback callback to invoke once worker health is assured
*/
void healthCheck(Callback<Void> callback);
/**
* Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered

View File

@ -372,15 +372,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.info("Herder starting");
herderThread = Thread.currentThread();
try (TickThreadStage stage = new TickThreadStage("reading to the end of internal topics")) {
try (TickThreadStage stage = new TickThreadStage("initializing and reading to the end of internal topics")) {
startServices();
}
log.info("Herder started");
running = true;
while (!stopping.get()) {
tick();
if (!isReady()) {
ready();
log.info("Herder started");
}
}
recordTickThreadStage("shutting down");
@ -391,8 +393,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Utils.closeQuietly(this::stopServices, "herder services");
Exit.exit(1);
} finally {
running = false;
}
}
@ -848,7 +848,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
ThreadUtils.shutdownExecutorServiceQuietly(forwardRequestExecutor, FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(startAndStopExecutor, START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
log.info("Herder stopped");
running = false;
}
@Override
public void healthCheck(Callback<Void> callback) {
addRequest(
() -> {
callback.onCompletion(null, null);
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}
@Override

View File

@ -28,7 +28,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -64,18 +63,8 @@ public class HerderRequestHandler {
} catch (ExecutionException e) {
throw e.getCause();
} catch (StagedTimeoutException e) {
String message;
Stage stage = e.stage();
if (stage.completed() != null) {
message = "Request timed out. The last operation the worker completed was "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started()) + " and completed at "
+ Instant.ofEpochMilli(stage.completed());
} else {
message = "Request timed out. The worker is currently "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started());
}
String message = "Request timed out. " + stage.summarize();
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
// error is the best option
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), message);

View File

@ -23,4 +23,23 @@ public interface RestRequestTimeout {
*/
long timeoutMs();
/**
* @return the current timeout that should be used for health check REST requests, in milliseconds
*/
long healthCheckTimeoutMs();
static RestRequestTimeout constant(long timeoutMs, long healthCheckTimeoutMs) {
return new RestRequestTimeout() {
@Override
public long timeoutMs() {
return timeoutMs;
}
@Override
public long healthCheckTimeoutMs() {
return healthCheckTimeoutMs;
}
};
}
}

View File

@ -77,6 +77,7 @@ public abstract class RestServer {
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
// but currently a worker simply leaving the group can take this long as well.
public static final long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90);
public static final long DEFAULT_HEALTH_CHECK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
private static final Logger log = LoggerFactory.getLogger(RestServer.class);
@ -107,7 +108,7 @@ public abstract class RestServer {
jettyServer = new Server();
handlers = new ContextHandlerCollection();
requestTimeout = new RequestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
requestTimeout = new RequestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS, DEFAULT_HEALTH_CHECK_TIMEOUT_MS);
createConnectors(listeners, adminListeners);
}
@ -450,6 +451,11 @@ public abstract class RestServer {
this.requestTimeout.timeoutMs(requestTimeoutMs);
}
// For testing only
public void healthCheckTimeout(long healthCheckTimeoutMs) {
this.requestTimeout.healthCheckTimeoutMs(healthCheckTimeoutMs);
}
String determineAdvertisedProtocol() {
String advertisedSecurityProtocol = config.advertisedListener();
if (advertisedSecurityProtocol == null) {
@ -534,9 +540,11 @@ public abstract class RestServer {
private final RequestBinder binder;
private volatile long timeoutMs;
private volatile long healthCheckTimeoutMs;
public RequestTimeout(long initialTimeoutMs) {
public RequestTimeout(long initialTimeoutMs, long initialHealthCheckTimeoutMs) {
this.timeoutMs = initialTimeoutMs;
this.healthCheckTimeoutMs = initialHealthCheckTimeoutMs;
this.binder = new RequestBinder();
}
@ -545,10 +553,19 @@ public abstract class RestServer {
return timeoutMs;
}
@Override
public long healthCheckTimeoutMs() {
return healthCheckTimeoutMs;
}
public void timeoutMs(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
public void healthCheckTimeoutMs(long healthCheckTimeoutMs) {
this.healthCheckTimeoutMs = healthCheckTimeoutMs;
}
public Binder binder() {
return binder;
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class WorkerStatus {
private static final WorkerStatus HEALTHY = new WorkerStatus(
"healthy",
"Worker has completed startup and is ready to handle requests."
);
private final String status;
private final String message;
@JsonCreator
private WorkerStatus(
@JsonProperty("status") String status,
@JsonProperty("message") String message
) {
this.status = status;
this.message = message;
}
public static WorkerStatus healthy() {
return HEALTHY;
}
public static WorkerStatus starting(String statusDetails) {
String message = "Worker is still starting up.";
if (statusDetails != null)
message += " " + statusDetails;
return new WorkerStatus(
"starting",
message
);
}
public static WorkerStatus unhealthy(String statusDetails) {
String message = "Worker was unable to handle this request and may be unable to handle other requests.";
if (statusDetails != null)
message += " " + statusDetails;
return new WorkerStatus(
"unhealthy",
message
);
}
@JsonProperty
public String status() {
return status;
}
@JsonProperty
public String message() {
return message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WorkerStatus that = (WorkerStatus) o;
return Objects.equals(status, that.status) && Objects.equals(message, that.message);
}
@Override
public int hashCode() {
return Objects.hash(status, message);
}
@Override
public String toString() {
return "WorkerStatus{" +
"status='" + status + '\'' +
", message='" + message + '\'' +
'}';
}
}

View File

@ -16,14 +16,24 @@
*/
package org.apache.kafka.connect.runtime.rest.resources;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.entities.WorkerStatus;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.StagedTimeoutException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.swagger.v3.oas.annotations.Operation;
@ -32,15 +42,59 @@ import io.swagger.v3.oas.annotations.Operation;
public class RootResource {
private final Herder herder;
private final RestRequestTimeout requestTimeout;
private final Time time;
@Inject
public RootResource(Herder herder) {
public RootResource(Herder herder, RestRequestTimeout requestTimeout) {
this(herder, requestTimeout, Time.SYSTEM);
}
// For testing only
RootResource(Herder herder, RestRequestTimeout requestTimeout, Time time) {
this.herder = herder;
this.requestTimeout = requestTimeout;
this.time = time;
}
@GET
@Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to")
@Operation(summary = "Get details about this Connect worker and the ID of the Kafka cluster it is connected to")
public ServerInfo serverInfo() {
return new ServerInfo(herder.kafkaClusterId());
}
@GET
@Path("/health")
@Operation(summary = "Health check endpoint to verify worker readiness and liveness")
public Response healthCheck() throws Throwable {
WorkerStatus workerStatus;
int statusCode;
try {
FutureCallback<Void> cb = new FutureCallback<>();
herder.healthCheck(cb);
long timeoutNs = TimeUnit.MILLISECONDS.toNanos(requestTimeout.healthCheckTimeoutMs());
long deadlineNs = timeoutNs + time.nanoseconds();
time.waitForFuture(cb, deadlineNs);
statusCode = Response.Status.OK.getStatusCode();
workerStatus = WorkerStatus.healthy();
} catch (TimeoutException e) {
String statusDetails = e instanceof StagedTimeoutException
? ((StagedTimeoutException) e).stage().summarize()
: null;
if (!herder.isReady()) {
statusCode = Response.Status.SERVICE_UNAVAILABLE.getStatusCode();
workerStatus = WorkerStatus.starting(statusDetails);
} else {
statusCode = Response.Status.INTERNAL_SERVER_ERROR.getStatusCode();
workerStatus = WorkerStatus.unhealthy(statusDetails);
}
} catch (ExecutionException e) {
throw e.getCause();
}
return Response.status(statusCode).entity(workerStatus).build();
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.standalone;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.Queue;
/**
* Thread that can be used to check for the readiness and liveness of a standalone herder.
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1017%3A+Health+check+endpoint+for+Kafka+Connect">KIP-1017</a>
*/
class HealthCheckThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(HealthCheckThread.class);
private final StandaloneHerder herder;
private final Queue<Callback<Void>> callbacks;
private volatile boolean running;
public HealthCheckThread(StandaloneHerder herder) {
this.herder = herder;
this.callbacks = new LinkedList<>();
this.running = true;
}
@Override
public void run() {
while (true) {
try {
Callback<Void> callback;
synchronized (this) {
while (running && this.callbacks.isEmpty())
wait();
if (!running)
break;
callback = callbacks.remove();
}
// For now, our only criteria for liveness (as opposed to readiness)
// in standalone mode is that the worker isn't deadlocked
synchronized (herder) {
try {
callback.onCompletion(null, null);
} catch (Throwable t) {
log.warn("Failed to complete health check callback", t);
}
}
} catch (Throwable t) {
log.warn("Health check thread encountered unexpected error", t);
}
}
Throwable shuttingDown = new ConnectException("The herder is shutting down");
// Don't leave callbacks dangling, even if shutdown has already started
while (!this.callbacks.isEmpty()) {
try {
callbacks.remove().onCompletion(shuttingDown, null);
} catch (Throwable t) {
log.warn("Failed to complete health check callback during shutdown", t);
}
}
}
/**
* Check the health of the herder. This method may be called at any time after
* the thread has been constructed, but callbacks will only be invoked after this
* thread has been {@link #start() started}.
*
* @param callback callback to invoke after herder health has been verified or if
* an error occurs that indicates herder is unhealthy; may not be null
* @throws IllegalStateException if invoked after {@link #shutDown()}
*/
public void check(Callback<Void> callback) {
if (callback == null) {
log.warn("Ignoring null callback");
return;
}
synchronized (this) {
if (!running) {
throw new IllegalStateException("Cannot check herder health after thread has been shut down");
}
this.callbacks.add(callback);
notifyAll();
}
}
/**
* Trigger shutdown of the thread, stop accepting new callbacks via {@link #check(Callback)},
* and await the termination of the thread.
*/
public void shutDown() {
synchronized (this) {
this.running = false;
notifyAll();
}
try {
join();
} catch (InterruptedException e) {
log.warn(
"Interrupted during graceful shutdown; will interrupt health check thread "
+ "and then return immediately without waiting for thread to terminate",
e
);
this.interrupt();
// Preserve the interrupt status in case later operations block too
Thread.currentThread().interrupt();
}
}
}

View File

@ -74,6 +74,7 @@ public class StandaloneHerder extends AbstractHerder {
private final AtomicLong requestSeqNum = new AtomicLong();
private final ScheduledExecutorService requestExecutorService;
private final HealthCheckThread healthCheckThread;
// Visible for testing
ClusterConfigState configState;
@ -91,6 +92,7 @@ public class StandaloneHerder extends AbstractHerder {
}
// visible for testing
@SuppressWarnings("this-escape")
StandaloneHerder(Worker worker,
String workerId,
String kafkaClusterId,
@ -101,6 +103,7 @@ public class StandaloneHerder extends AbstractHerder {
super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy, time);
this.configState = ClusterConfigState.EMPTY;
this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();
this.healthCheckThread = new HealthCheckThread(this);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@ -108,7 +111,6 @@ public class StandaloneHerder extends AbstractHerder {
public synchronized void start() {
log.info("Herder starting");
startServices();
running = true;
log.info("Herder started");
}
@ -124,10 +126,21 @@ public class StandaloneHerder extends AbstractHerder {
worker.stopAndAwaitConnector(connName);
}
stopServices();
running = false;
healthCheckThread.shutDown();
log.info("Herder stopped");
}
@Override
public void ready() {
super.ready();
healthCheckThread.start();
}
@Override
public void healthCheck(Callback<Void> cb) {
healthCheckThread.check(cb);
}
@Override
public int generation() {
return 0;

View File

@ -19,6 +19,8 @@ package org.apache.kafka.connect.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
public class Stage {
private static final Logger log = LoggerFactory.getLogger(Stage.class);
@ -59,6 +61,20 @@ public class Stage {
this.completed = time;
}
public String summarize() {
Long completed = this.completed;
if (completed != null) {
return "The last operation the worker completed was "
+ description() + ", which began at "
+ Instant.ofEpochMilli(started()) + " and completed at "
+ Instant.ofEpochMilli(completed()) + ".";
} else {
return "The worker is currently "
+ description() + ", which began at "
+ Instant.ofEpochMilli(started()) + ".";
}
}
@Override
public String toString() {
return description + "(started " + started + ", completed=" + completed() + ")";

View File

@ -68,7 +68,6 @@ import javax.ws.rs.core.Response;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -94,10 +93,10 @@ public class BlockingConnectorTest {
private static final String CONNECTOR_INITIALIZE = "Connector::initialize";
private static final String CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS = "Connector::initializeWithTaskConfigs";
private static final String CONNECTOR_START = "Connector::start";
static final String CONNECTOR_START = "Connector::start";
private static final String CONNECTOR_RECONFIGURE = "Connector::reconfigure";
private static final String CONNECTOR_TASK_CLASS = "Connector::taskClass";
private static final String CONNECTOR_TASK_CONFIGS = "Connector::taskConfigs";
static final String CONNECTOR_TASK_CONFIGS = "Connector::taskConfigs";
private static final String CONNECTOR_STOP = "Connector::stop";
private static final String CONNECTOR_VALIDATE = "Connector::validate";
private static final String CONNECTOR_CONFIG = "Connector::config";
@ -378,7 +377,7 @@ public class BlockingConnectorTest {
);
}
// Reset the REST request timeout so that other requests aren't impacted
connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
connect.resetRequestTimeout();
}
public static class Block {

View File

@ -69,6 +69,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.ws.rs.core.Response;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG;
@ -93,7 +95,6 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CON
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.containsString;
@ -182,7 +183,7 @@ public class ConnectWorkerIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks are not all in running state.");
Set<WorkerHandle> workers = connect.activeWorkers();
Set<WorkerHandle> workers = connect.healthyWorkers();
assertTrue(workers.contains(extraWorker));
connect.removeWorker(extraWorker);
@ -190,7 +191,7 @@ public class ConnectWorkerIntegrationTest {
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
"Group of workers did not shrink in time.");
workers = connect.activeWorkers();
workers = connect.healthyWorkers();
assertFalse(workers.contains(extraWorker));
}
@ -259,6 +260,25 @@ public class ConnectWorkerIntegrationTest {
connect.kafka().stopOnlyKafka();
connect.requestTimeout(1000);
assertFalse(
connect.anyWorkersHealthy(),
"No workers should be healthy when underlying Kafka cluster is down"
);
connect.workers().forEach(worker -> {
try (Response response = connect.healthCheck(worker)) {
assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
assertNotNull(response.getEntity());
String body = response.getEntity().toString();
String expectedSubstring = "Worker was unable to handle this request and may be unable to handle other requests";
assertTrue(
body.contains(expectedSubstring),
"Response body '" + body + "' did not contain expected message '" + expectedSubstring + "'"
);
}
});
connect.resetRequestTimeout();
// Allow for the workers to discover that the coordinator is unavailable, wait is
// heartbeat timeout * 2 + 4sec
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
@ -909,6 +929,8 @@ public class ConnectWorkerIntegrationTest {
private void assertTimeoutException(Runnable operation, String expectedStageDescription) throws InterruptedException {
connect.requestTimeout(1_000);
AtomicReference<Throwable> latestError = new AtomicReference<>();
// Wait for the specific operation against the Connect cluster to time out
waitForCondition(
() -> {
try {
@ -941,7 +963,25 @@ public class ConnectWorkerIntegrationTest {
}
}
);
connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
// Ensure that the health check endpoints of all workers also report the same timeout message
connect.workers().forEach(worker -> {
try (Response response = connect.healthCheck(worker)) {
assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
assertNotNull(response.getEntity());
String body = response.getEntity().toString();
String expectedSubstring = "Worker was unable to handle this request and may be unable to handle other requests";
assertTrue(
body.contains(expectedSubstring),
"Response body '" + body + "' did not contain expected message '" + expectedSubstring + "'"
);
assertTrue(
body.contains(expectedStageDescription),
"Response body '" + body + "' did not contain expected message '" + expectedStageDescription + "'"
);
}
});
connect.resetRequestTimeout();
}
/**
@ -1142,7 +1182,7 @@ public class ConnectWorkerIntegrationTest {
connect.deleteConnector(CONNECTOR_NAME);
// Roll the entire cluster
connect.activeWorkers().forEach(connect::removeWorker);
connect.healthyWorkers().forEach(connect::removeWorker);
// Miserable hack: produce directly to the config topic and then wait a little bit
// in order to trigger segment rollover and allow compaction to take place

View File

@ -33,7 +33,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.ws.rs.core.Response;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test for the creation of internal topics.
@ -135,7 +140,19 @@ public class InternalTopicsIntegrationTest {
log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers);
// Try to start a worker
connect.addWorker();
WorkerHandle worker = connect.addWorker();
connect.requestTimeout(1000);
try (Response response = connect.healthCheck(worker)) {
assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), response.getStatus());
assertNotNull(response.getEntity());
String body = response.getEntity().toString();
assertTrue(
body.contains("The worker is currently initializing and reading to the end of internal topics"),
"Body did not contain expected message detailing the worker's in-progress operation: " + body
);
}
connect.resetRequestTimeout();
// Verify that the offset and config topic don't exist;
// the status topic may have been created if timing was right but we don't care
@ -185,9 +202,9 @@ public class InternalTopicsIntegrationTest {
// Try to start one worker, with three bad topics
WorkerHandle worker = connect.addWorker(); // should have failed to start before returning
assertFalse(worker.isRunning());
assertFalse(connect.allWorkersRunning());
assertFalse(connect.anyWorkersRunning());
assertFalse(connect.isHealthy(worker));
assertFalse(connect.allWorkersHealthy());
assertFalse(connect.anyWorkersHealthy());
connect.removeWorker(worker);
// We rely upon the fact that we can change the worker properties before the workers are started
@ -195,9 +212,9 @@ public class InternalTopicsIntegrationTest {
// Try to start one worker, with two bad topics remaining
worker = connect.addWorker(); // should have failed to start before returning
assertFalse(worker.isRunning());
assertFalse(connect.allWorkersRunning());
assertFalse(connect.anyWorkersRunning());
assertFalse(connect.isHealthy(worker));
assertFalse(connect.allWorkersHealthy());
assertFalse(connect.anyWorkersHealthy());
connect.removeWorker(worker);
// We rely upon the fact that we can change the worker properties before the workers are started
@ -205,9 +222,9 @@ public class InternalTopicsIntegrationTest {
// Try to start one worker, with one bad topic remaining
worker = connect.addWorker(); // should have failed to start before returning
assertFalse(worker.isRunning());
assertFalse(connect.allWorkersRunning());
assertFalse(connect.anyWorkersRunning());
assertFalse(connect.isHealthy(worker));
assertFalse(connect.allWorkersHealthy());
assertFalse(connect.anyWorkersHealthy());
connect.removeWorker(worker);
// We rely upon the fact that we can change the worker properties before the workers are started
workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "good-status");

View File

@ -184,7 +184,7 @@ public class SourceConnectorsIntegrationTest {
connect.assertions().assertTopicsDoNotExist(FOO_TOPIC);
connect.activeWorkers().forEach(w -> connect.removeWorker(w));
connect.healthyWorkers().forEach(w -> connect.removeWorker(w));
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(true));

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
@ -26,6 +27,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@ -33,45 +36,65 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import static org.apache.kafka.connect.integration.BlockingConnectorTest.Block.BLOCK_CONFIG;
import static org.apache.kafka.connect.integration.BlockingConnectorTest.CONNECTOR_START;
import static org.apache.kafka.connect.integration.BlockingConnectorTest.CONNECTOR_TASK_CONFIGS;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class StandaloneWorkerIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(StandaloneWorkerIntegrationTest.class);
private static final String CONNECTOR_NAME = "test-connector";
private static final int NUM_TASKS = 4;
private static final String TOPIC_NAME = "test-topic";
private EmbeddedConnectStandalone.Builder connectBuilder;
private EmbeddedConnectStandalone connect;
@BeforeEach
public void setup() {
connect = new EmbeddedConnectStandalone.Builder()
.build();
connect.start();
connectBuilder = new EmbeddedConnectStandalone.Builder();
}
@AfterEach
public void cleanup() {
// Unblock everything so that we don't leak threads even if a test run fails
BlockingConnectorTest.Block.reset();
if (connect != null)
connect.stop();
BlockingConnectorTest.Block.join();
}
@Test
public void testDynamicLogging() {
connect = connectBuilder.build();
connect.start();
Map<String, LoggerLevel> initialLevels = connect.allLogLevels();
assertFalse(initialLevels.isEmpty(), "Connect REST API did not list any known loggers");
Map<String, LoggerLevel> invalidModifiedLoggers = Utils.filterMap(
@ -219,6 +242,9 @@ public class StandaloneWorkerIntegrationTest {
@Test
public void testCreateConnectorWithStoppedInitialState() throws Exception {
connect = connectBuilder.build();
connect.start();
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME),
@ -243,9 +269,108 @@ public class StandaloneWorkerIntegrationTest {
);
}
@Test
public void testHealthCheck() throws Exception {
int numTasks = 1; // The blocking connector only generates a single task anyways
Map<String, String> blockedConnectorConfig = new HashMap<>();
blockedConnectorConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingConnector.class.getName());
blockedConnectorConfig.put(NAME_CONFIG, CONNECTOR_NAME);
blockedConnectorConfig.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
blockedConnectorConfig.put(BLOCK_CONFIG, CONNECTOR_START);
connect = connectBuilder.withCommandLineConnector(blockedConnectorConfig).build();
Thread workerThread = new Thread(connect::start);
workerThread.setName("integration-test-standalone-connect-worker");
try {
workerThread.start();
AtomicReference<Response> healthCheckResponse = new AtomicReference<>();
connect.requestTimeout(1_000);
waitForCondition(
() -> {
Response response = connect.healthCheck();
healthCheckResponse.set(response);
return true;
},
60_000,
"Health check endpoint for standalone worker was not available in time"
);
connect.resetRequestTimeout();
// Worker hasn't completed startup; should be serving 503 responses from the health check endpoint
try (Response response = healthCheckResponse.get()) {
assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), response.getStatus());
assertNotNull(response.getEntity());
String body = response.getEntity().toString();
assertTrue(
body.contains("Worker is still starting up"),
"Body did not contain expected message: " + body
);
}
BlockingConnectorTest.Block.reset();
// Worker has completed startup by this point; should serve 200 responses
try (Response response = connect.healthCheck()) {
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
assertNotNull(response.getEntity());
String body = response.getEntity().toString();
assertTrue(
body.contains("Worker has completed startup and is ready to handle requests."),
"Body did not contain expected message: " + body
);
}
// And, if the worker claims that it's healthy, it should have also been able to generate tasks
// for the now-unblocked connector
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"Connector or tasks did not start running healthily in time"
);
// Hack: if a connector blocks in its taskConfigs method, then the worker gets blocked
// If that bug is ever fixed, we can remove this section from the test (it's not worth keeping
// the bug just for the coverage it provides)
blockedConnectorConfig.put(BLOCK_CONFIG, CONNECTOR_TASK_CONFIGS);
connect.requestTimeout(1_000);
assertThrows(
ConnectRestException.class,
() -> connect.configureConnector(CONNECTOR_NAME, blockedConnectorConfig)
);
// Worker has completed startup but is now blocked; should serve 500 responses
try (Response response = connect.healthCheck()) {
assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
assertNotNull(response.getEntity());
String body = response.getEntity().toString();
assertTrue(
body.contains("Worker was unable to handle this request and may be unable to handle other requests."),
"Body did not contain expected message: " + body
);
}
connect.resetRequestTimeout();
BlockingConnectorTest.Block.reset();
connect.deleteConnector(CONNECTOR_NAME);
} finally {
if (workerThread.isAlive()) {
log.debug("Standalone worker startup not completed yet; interrupting and waiting for startup to finish");
workerThread.interrupt();
workerThread.join(TimeUnit.MINUTES.toMillis(1));
if (workerThread.isAlive()) {
log.warn("Standalone worker startup never completed; abandoning thread");
}
}
}
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector
Map<String, String> props = new HashMap<>();
props.put(NAME_CONFIG, CONNECTOR_NAME);
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPIC_CONFIG, topic);

View File

@ -39,7 +39,6 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@ -88,6 +87,8 @@ import java.util.stream.Stream;
import javax.ws.rs.BadRequestException;
import static java.util.Arrays.asList;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -216,7 +217,10 @@ public class ConnectorPluginsResourceTest {
doReturn(HEADER_CONVERTER_PLUGINS).when(plugins).headerConverters();
doReturn(TRANSFORMATION_PLUGINS).when(plugins).transformations();
doReturn(PREDICATE_PLUGINS).when(plugins).predicates();
RestRequestTimeout requestTimeout = () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
RestRequestTimeout requestTimeout = RestRequestTimeout.constant(
DEFAULT_REST_REQUEST_TIMEOUT_MS,
DEFAULT_HEALTH_CHECK_TIMEOUT_MS
);
connectorPluginsResource = new ConnectorPluginsResource(herder, requestTimeout);
}

View File

@ -28,7 +28,6 @@ import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@ -72,6 +71,8 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -159,7 +160,10 @@ public class ConnectorsResourceTest {
private static final Set<String> CONNECTOR_ACTIVE_TOPICS = new HashSet<>(
Arrays.asList("foo_topic", "bar_topic"));
private static final RestRequestTimeout REQUEST_TIMEOUT = () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
private static final RestRequestTimeout REQUEST_TIMEOUT = RestRequestTimeout.constant(
DEFAULT_REST_REQUEST_TIMEOUT_MS,
DEFAULT_HEALTH_CHECK_TIMEOUT_MS
);
@Mock
private Herder herder;

View File

@ -20,6 +20,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.util.Callback;
@ -69,6 +70,10 @@ public class InternalConnectResourceTest {
}
private static final String FENCE_PATH = "/connectors/" + CONNECTOR_NAME + "/fence";
private static final String TASK_CONFIGS_PATH = "/connectors/" + CONNECTOR_NAME + "/tasks";
private static final RestRequestTimeout REST_REQUEST_TIMEOUT = RestRequestTimeout.constant(
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS,
RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS
);
@Mock
private UriInfo uriInfo;
@ -81,7 +86,7 @@ public class InternalConnectResourceTest {
@BeforeEach
public void setup() {
internalResource = new InternalConnectResource(herder, restClient, () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
internalResource = new InternalConnectResource(herder, restClient, REST_REQUEST_TIMEOUT);
internalResource.uriInfo = uriInfo;
}

View File

@ -18,8 +18,13 @@ package org.apache.kafka.connect.runtime.rest.resources;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.entities.WorkerStatus;
import org.apache.kafka.connect.util.Stage;
import org.apache.kafka.connect.util.StagedTimeoutException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -28,8 +33,20 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Stubber;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.core.Response;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -38,11 +55,17 @@ import static org.mockito.Mockito.when;
public class RootResourceTest {
@Mock private Herder herder;
@Mock private Time time;
private RootResource rootResource;
private static final RestRequestTimeout REQUEST_TIMEOUT = RestRequestTimeout.constant(
DEFAULT_REST_REQUEST_TIMEOUT_MS,
DEFAULT_HEALTH_CHECK_TIMEOUT_MS
);
@BeforeEach
public void setUp() {
rootResource = new RootResource(herder);
rootResource = new RootResource(herder, REQUEST_TIMEOUT, time);
}
@Test
@ -56,4 +79,94 @@ public class RootResourceTest {
verify(herder).kafkaClusterId();
}
@Test
public void testHealthCheckRunning() throws Throwable {
expectHealthCheck(null);
Response response = rootResource.healthCheck();
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
WorkerStatus expectedStatus = WorkerStatus.healthy();
WorkerStatus actualStatus = workerStatus(response);
assertEquals(expectedStatus, actualStatus);
}
@Test
public void testHealthCheckStarting() throws Throwable {
expectHealthCheck(new TimeoutException());
when(herder.isReady()).thenReturn(false);
Response response = rootResource.healthCheck();
assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), response.getStatus());
WorkerStatus expectedStatus = WorkerStatus.starting(null);
WorkerStatus actualStatus = workerStatus(response);
assertEquals(expectedStatus, actualStatus);
}
@Test
public void testHealthCheckStartingWithStage() throws Throwable {
String stageDescription = "experiencing a simulated failure for testing purposes";
Stage stage = new Stage(stageDescription, 0);
StagedTimeoutException exception = new StagedTimeoutException(stage);
expectHealthCheck(exception);
when(herder.isReady()).thenReturn(false);
Response response = rootResource.healthCheck();
assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), response.getStatus());
WorkerStatus expectedStatus = WorkerStatus.starting(stage.summarize());
WorkerStatus actualStatus = workerStatus(response);
assertEquals(expectedStatus, actualStatus);
assertTrue(
actualStatus.message().contains(stageDescription),
"Status message '" + actualStatus.message() + "' did not contain stage description '" + stageDescription + "'"
);
}
@Test
public void testHealthCheckUnhealthy() throws Throwable {
expectHealthCheck(new TimeoutException());
when(herder.isReady()).thenReturn(true);
Response response = rootResource.healthCheck();
assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
WorkerStatus expectedStatus = WorkerStatus.unhealthy(null);
WorkerStatus actualStatus = workerStatus(response);
assertEquals(expectedStatus, actualStatus);
}
@Test
public void testHealthCheckUnhealthyWithStage() throws Throwable {
String stageDescription = "experiencing a simulated failure for testing purposes";
Stage stage = new Stage(stageDescription, 0);
StagedTimeoutException exception = new StagedTimeoutException(stage);
expectHealthCheck(exception);
when(herder.isReady()).thenReturn(true);
Response response = rootResource.healthCheck();
assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
WorkerStatus expectedStatus = WorkerStatus.unhealthy(stage.summarize());
WorkerStatus actualStatus = workerStatus(response);
assertEquals(expectedStatus, actualStatus);
assertTrue(
actualStatus.message().contains(stageDescription),
"Status message '" + actualStatus.message() + "' did not contain stage description '" + stageDescription + "'"
);
}
private WorkerStatus workerStatus(Response response) {
return (WorkerStatus) (response.getEntity());
}
private void expectHealthCheck(Throwable error) throws Throwable {
Stubber stubber = error != null
? doThrow(error)
: doReturn(null);
stubber.when(time).waitForFuture(any(), anyLong());
}
}

View File

@ -101,7 +101,7 @@ public class ConnectAssertions {
*/
protected Optional<Boolean> checkWorkersUp(int numWorkers, BiFunction<Integer, Integer, Boolean> comp) {
try {
int numUp = connect.activeWorkers().size();
int numUp = connect.healthyWorkers().size();
return Optional.of(comp.apply(numUp, numWorkers));
} catch (Exception e) {
log.error("Could not check active workers.", e);

View File

@ -129,14 +129,17 @@ abstract class EmbeddedConnect {
Exit.setExitProcedure(exitProcedure);
Exit.setHaltProcedure(haltProcedure);
}
kafkaCluster.start();
startConnect();
try {
httpClient.start();
} catch (Exception e) {
throw new ConnectException("Failed to start HTTP client", e);
}
startConnect();
try {
if (numBrokers > 0) {
assertions().assertExactlyNumBrokersAreUp(
@ -206,6 +209,39 @@ abstract class EmbeddedConnect {
workers().forEach(worker -> worker.requestTimeout(requestTimeoutMs));
}
/**
* Reset the REST request timeout to the default value that's used in non-testing
* environments. Useful if it has been previous modified using {@link #requestTimeout(long)}.
*/
public void resetRequestTimeout() {
workers().forEach(WorkerHandle::resetRequestTimeout);
}
/**
* Check to see if the worker is running, using the health check endpoint introduced in
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1017%3A+Health+check+endpoint+for+Kafka+Connect">KIP-1017</a>.
* @param workerHandle the worker to check; may not be null
* @return whether the worker is ready, based on its health check endpoint
*/
public boolean isHealthy(WorkerHandle workerHandle) {
try (Response response = healthCheck(workerHandle)) {
return response.getStatus() == Response.Status.OK.getStatusCode();
} catch (Exception e) {
log.debug("Failed to check for worker readiness", e);
return false;
}
}
/**
* Contact the health check endpoint for the worker
* @param workerHandle the worker to contact; may not be null
* @return the response from the worker
*/
public Response healthCheck(WorkerHandle workerHandle) {
String url = workerHandle.url().resolve("health").toString();
return requestGet(url);
}
/**
* Configure a connector. If the connector does not already exist, a new one will be created and
* the given configuration will be applied to it.
@ -1036,30 +1072,12 @@ abstract class EmbeddedConnect {
*
* @return the list of handles of the online workers
*/
public Set<WorkerHandle> activeWorkers() {
public Set<WorkerHandle> healthyWorkers() {
return workers().stream()
.filter(w -> {
try {
String endpoint = w.url().resolve("/connectors/liveness-check").toString();
Response response = requestGet(endpoint);
boolean live = response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
|| response.getStatus() == Response.Status.OK.getStatusCode();
if (live) {
return true;
} else {
log.warn("Worker failed liveness probe. Response: {}", response);
return false;
}
} catch (Exception e) {
// Worker failed to respond. Consider it's offline
log.warn("Failed to contact worker during liveness check", e);
return false;
}
})
.filter(this::isHealthy)
.collect(Collectors.toSet());
}
/**
* Return the available assertions for this Connect cluster
*

View File

@ -126,8 +126,8 @@ public class EmbeddedConnectCluster extends EmbeddedConnect {
*
* @return true if any worker is running, or false otherwise
*/
public boolean anyWorkersRunning() {
return workers().stream().anyMatch(WorkerHandle::isRunning);
public boolean anyWorkersHealthy() {
return workers().stream().anyMatch(this::isHealthy);
}
/**
@ -135,8 +135,8 @@ public class EmbeddedConnectCluster extends EmbeddedConnect {
*
* @return true if all workers are running, or false otherwise
*/
public boolean allWorkersRunning() {
return workers().stream().allMatch(WorkerHandle::isRunning);
public boolean allWorkersHealthy() {
return workers().stream().allMatch(this::isHealthy);
}
@Override

View File

@ -18,18 +18,27 @@ package org.apache.kafka.connect.util.clusters;
import org.apache.kafka.connect.cli.ConnectStandalone;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import javax.ws.rs.core.Response;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
@ -49,9 +58,10 @@ public class EmbeddedConnectStandalone extends EmbeddedConnect {
private static final String REST_HOST_NAME = "localhost";
private final Map<String, String> workerProps;
private final List<Map<String, String>> connectorConfigs;
private final String offsetsFile;
private WorkerHandle connectWorker;
private volatile WorkerHandle connectWorker;
private EmbeddedConnectStandalone(
int numBrokers,
@ -59,10 +69,12 @@ public class EmbeddedConnectStandalone extends EmbeddedConnect {
boolean maskExitProcedures,
Map<String, String> clientProps,
Map<String, String> workerProps,
List<Map<String, String>> connectorConfigs,
String offsetsFile
) {
super(numBrokers, brokerProps, maskExitProcedures, clientProps);
this.workerProps = workerProps;
this.connectorConfigs = connectorConfigs;
this.offsetsFile = offsetsFile;
}
@ -79,8 +91,10 @@ public class EmbeddedConnectStandalone extends EmbeddedConnect {
workerProps.putIfAbsent(VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
workerProps.putIfAbsent(PLUGIN_DISCOVERY_CONFIG, "hybrid_fail");
Connect connect = new ConnectStandalone().startConnect(workerProps);
ConnectStandalone cli = new ConnectStandalone();
Connect<StandaloneHerder> connect = cli.startConnect(workerProps);
connectWorker = new WorkerHandle("standalone", connect);
cli.processExtraArgs(connect, connectorConfigFiles());
}
@Override
@ -97,8 +111,35 @@ public class EmbeddedConnectStandalone extends EmbeddedConnect {
: Collections.emptySet();
}
public Response healthCheck() {
Objects.requireNonNull(connectWorker, "Cannot perform health check before starting worker");
return healthCheck(connectWorker);
}
private String[] connectorConfigFiles() {
String[] result = new String[connectorConfigs.size()];
for (int i = 0; i < connectorConfigs.size(); i++) {
try {
File connectorConfigFile = TestUtils.tempFile("standalone-connect", "connector-" + i);
Properties connectorConfigProps = new Properties();
connectorConfigProps.putAll(connectorConfigs.get(i));
try (OutputStream outputStream = Files.newOutputStream(connectorConfigFile.toPath())) {
connectorConfigProps.store(outputStream, "");
}
result[i] = connectorConfigFile.getAbsolutePath();
} catch (IOException e) {
throw new UncheckedIOException("Failed to create temporary config file for connector " + i, e);
}
}
return result;
}
public static class Builder extends EmbeddedConnectBuilder<EmbeddedConnectStandalone, Builder> {
private final List<Map<String, String>> connectorConfigs = new ArrayList<>();
private String offsetsFile = null;
public Builder offsetsFile(String offsetsFile) {
@ -106,6 +147,11 @@ public class EmbeddedConnectStandalone extends EmbeddedConnect {
return this;
}
public Builder withCommandLineConnector(Map<String, String> connectorConfig) {
this.connectorConfigs.add(connectorConfig);
return this;
}
@Override
protected EmbeddedConnectStandalone build(
int numBrokers,
@ -123,6 +169,7 @@ public class EmbeddedConnectStandalone extends EmbeddedConnect {
maskExitProcedures,
clientProps,
workerProps,
connectorConfigs,
offsetsFile
);
}

View File

@ -208,6 +208,18 @@ public class EmbeddedKafkaCluster {
}
private void stop(boolean deleteLogDirs, boolean stopZK) {
maybeShutDownProducer();
triggerBrokerShutdown();
awaitBrokerShutdown();
if (deleteLogDirs)
deleteLogDirs();
if (stopZK)
stopZK();
}
private void maybeShutDownProducer() {
try {
if (producer != null) {
producer.close();
@ -216,7 +228,9 @@ public class EmbeddedKafkaCluster {
log.error("Could not shutdown producer ", e);
throw new RuntimeException("Could not shutdown producer", e);
}
}
private void triggerBrokerShutdown() {
for (KafkaServer broker : brokers) {
try {
broker.shutdown();
@ -226,8 +240,21 @@ public class EmbeddedKafkaCluster {
throw new RuntimeException(msg, t);
}
}
}
if (deleteLogDirs) {
private void awaitBrokerShutdown() {
for (KafkaServer broker : brokers) {
try {
broker.awaitShutdown();
} catch (Throwable t) {
String msg = String.format("Failed while awaiting shutdown of broker at %s", address(broker));
log.error(msg, t);
throw new RuntimeException(msg, t);
}
}
}
private void deleteLogDirs() {
for (KafkaServer broker : brokers) {
try {
log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs());
@ -241,10 +268,9 @@ public class EmbeddedKafkaCluster {
}
}
private void stopZK() {
try {
if (stopZK) {
zookeeper.shutdown();
}
} catch (Throwable t) {
String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString());
log.error(msg, t);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.util.clusters;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.rest.RestServer;
import java.net.URI;
import java.util.Map;
@ -29,9 +30,9 @@ import java.util.Objects;
public class WorkerHandle {
private final String workerName;
private final Connect worker;
private final Connect<?> worker;
protected WorkerHandle(String workerName, Connect worker) {
protected WorkerHandle(String workerName, Connect<?> worker) {
this.workerName = workerName;
this.worker = worker;
}
@ -54,24 +55,6 @@ public class WorkerHandle {
worker.stop();
}
/**
* Determine if this worker is running.
*
* @return true if the worker is running, or false otherwise
*/
public boolean isRunning() {
return worker.isRunning();
}
/**
* Get the workers's name corresponding to this handle.
*
* @return the worker's name
*/
public String name() {
return workerName;
}
/**
* Get the workers's url that accepts requests to its REST endpoint.
*
@ -91,13 +74,22 @@ public class WorkerHandle {
}
/**
* Set a new timeout for REST requests to the worker. Useful if a request is expected
* to block, since the time spent awaiting that request can be reduced and test runtime
* bloat can be avoided.
* @param requestTimeoutMs the new timeout in milliseconds; must be positive
* Set a new timeout for REST requests to the worker, including health check requests.
* Useful if a request is expected to block, since the time spent awaiting that request
* can be reduced and test runtime bloat can be avoided.
* @param timeoutMs the new timeout in milliseconds; must be positive
*/
public void requestTimeout(long requestTimeoutMs) {
worker.rest().requestTimeout(requestTimeoutMs);
public void requestTimeout(long timeoutMs) {
worker.rest().requestTimeout(timeoutMs);
worker.rest().healthCheckTimeout(timeoutMs);
}
/**
* Reset the timeout for REST requests to the worker, including health check requests.
*/
public void resetRequestTimeout() {
worker.rest().requestTimeout(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
worker.rest().healthCheckTimeout(RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS);
}
@Override