KAFKA-15563: Provide informative error messages when Connect REST requests time out (#14562)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2023-12-11 16:48:16 -05:00 committed by GitHub
parent 839e75a729
commit 2a5fbf2882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1059 additions and 401 deletions

View File

@ -159,7 +159,7 @@
files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/>
<suppress checks="NPathComplexity"
files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
<!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling"

View File

@ -59,6 +59,8 @@ import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.log4j.Level;
import org.apache.kafka.connect.util.Stage;
import org.apache.kafka.connect.util.TemporaryStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -83,6 +85,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -124,6 +127,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
protected volatile boolean running = false;
private final ExecutorService connectorExecutor;
private final Time time;
protected final Loggers loggers;
private final ConcurrentMap<String, Connector> tempConnectors = new ConcurrentHashMap<>();
@ -143,6 +147,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
this.configBackingStore = configBackingStore;
this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
this.connectorExecutor = Executors.newCachedThreadPool();
this.time = time;
this.loggers = new Loggers(time);
}
@ -394,9 +399,17 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
@Override
public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback, boolean doLog) {
Stage waitingForThread = new Stage(
"waiting for a new thread to become available for connector validation",
time.milliseconds()
);
callback.recordStage(waitingForThread);
connectorExecutor.submit(() -> {
waitingForThread.complete(time.milliseconds());
try {
ConfigInfos result = validateConnectorConfig(connectorProps, doLog);
Function<String, TemporaryStage> reportStage = description ->
new TemporaryStage(description, callback, time);
ConfigInfos result = validateConnectorConfig(connectorProps, reportStage, doLog);
callback.onCompletion(null, result);
} catch (Throwable t) {
callback.onCompletion(t, null);
@ -468,10 +481,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
|| SinkConnectorConfig.hasDlqTopicConfig(connProps);
}
ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean doLog) {
ConfigInfos validateConnectorConfig(
Map<String, String> connectorProps,
Function<String, TemporaryStage> reportStage,
boolean doLog
) {
String stageDescription;
if (worker.configTransformer() != null) {
stageDescription = "resolving transformed configuration properties for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
connectorProps = worker.configTransformer().transform(connectorProps);
}
}
String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
if (connType == null)
throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
@ -485,12 +506,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
if (connector instanceof SourceConnector) {
connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE;
enrichedConfigDef = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.configDef(), connectorProps, false);
stageDescription = "validating source connector-specific properties for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
validatedConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, connectorProps);
}
} else {
connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
enrichedConfigDef = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.configDef(), connectorProps, false);
stageDescription = "validating sink connector-specific properties for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
validatedConnectorConfig = validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, connectorProps);
}
}
connectorProps.entrySet().stream()
.filter(e -> e.getValue() == null)
@ -505,7 +532,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
Set<String> allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
// do custom connector-specific validation
ConfigDef configDef = connector.config();
ConfigDef configDef;
stageDescription = "retrieving the configuration definition from the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
configDef = connector.config();
}
if (null == configDef) {
throw new BadRequestException(
String.format(
@ -514,7 +545,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
)
);
}
Config config = connector.validate(connectorProps);
Config config;
stageDescription = "performing multi-property validation for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
config = connector.validate(connectorProps);
}
if (null == config) {
throw new BadRequestException(
String.format(
@ -535,6 +571,8 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
ConfigInfos adminConfigInfos = null;
if (connectorUsesProducer(connectorType, connectorProps)) {
stageDescription = "validating producer config overrides for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
producerConfigInfos = validateClientOverrides(
connName,
ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
@ -545,7 +583,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
ConnectorClientConfigRequest.ClientType.PRODUCER,
connectorClientConfigOverridePolicy);
}
}
if (connectorUsesAdmin(connectorType, connectorProps)) {
stageDescription = "validating admin config overrides for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
adminConfigInfos = validateClientOverrides(
connName,
ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
@ -556,7 +597,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
ConnectorClientConfigRequest.ClientType.ADMIN,
connectorClientConfigOverridePolicy);
}
}
if (connectorUsesConsumer(connectorType, connectorProps)) {
stageDescription = "validating consumer config overrides for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
consumerConfigInfos = validateClientOverrides(
connName,
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
@ -567,6 +611,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
ConnectorClientConfigRequest.ClientType.CONSUMER,
connectorClientConfigOverridePolicy);
}
}
return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos);
}
}

View File

@ -75,6 +75,8 @@ import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.Stage;
import org.apache.kafka.connect.util.TemporaryStage;
import org.slf4j.Logger;
import javax.crypto.KeyGenerator;
@ -110,6 +112,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.common.utils.Utils.UncheckedCloseable;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
@ -213,6 +216,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private volatile long keyExpiration;
private short currentProtocolVersion;
private short backoffRetries;
private volatile DistributedHerderRequest currentRequest;
private volatile Stage tickThreadStage;
// visible for testing
// The latest pending restart request for each named connector
@ -333,6 +338,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
keyExpiration = Long.MAX_VALUE;
sessionKey = null;
backoffRetries = BACKOFF_RETRIES;
currentRequest = null;
tickThreadStage = new Stage("awaiting startup", time.milliseconds());
currentProtocolVersion = ConnectProtocolCompatibility.compatibility(
config.getString(DistributedConfig.CONNECT_PROTOCOL_CONFIG)
@ -362,7 +369,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.info("Herder starting");
herderThread = Thread.currentThread();
try (TickThreadStage stage = new TickThreadStage("reading to the end of internal topics")) {
startServices();
}
log.info("Herder started");
running = true;
@ -371,6 +380,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
tick();
}
recordTickThreadStage("shutting down");
halt();
log.info("Herder stopped");
@ -402,7 +412,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
log.debug("Ensuring group membership is still active");
member.ensureActive();
String stageDescription = "ensuring membership in the cluster";
member.ensureActive(() -> new TickThreadStage(stageDescription));
completeTickThreadStage();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
} catch (WakeupException e) {
@ -418,7 +430,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// We were accidentally fenced out, possibly by a zombie leader
try {
log.debug("Reclaiming write privileges for config topic after being fenced out");
try (TickThreadStage stage = new TickThreadStage("reclaiming write privileges for the config topic")) {
configBackingStore.claimWritePrivileges();
}
fencedFromConfigTopic = false;
log.debug("Successfully reclaimed write privileges for config topic after being fenced out");
} catch (Exception e) {
@ -440,7 +454,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
keyExpiration = Long.MAX_VALUE;
try {
SessionKey newSessionKey = new SessionKey(keyGenerator.generateKey(), now);
writeToConfigTopicAsLeader(() -> configBackingStore.putSessionKey(newSessionKey));
writeToConfigTopicAsLeader(
"writing a new session key to the config topic",
() -> configBackingStore.putSessionKey(newSessionKey)
);
} catch (Exception e) {
log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying", e);
canReadConfigs = false;
@ -461,7 +478,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (next == null) {
break;
} else if (now >= next.at) {
requests.pollFirst();
currentRequest = requests.pollFirst();
} else {
scheduledTick = next.at;
break;
@ -534,7 +551,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.trace("Polling for group activity; will wait for {}ms or until poll is interrupted by "
+ "either config backing store updates or a new external request",
nextRequestTimeoutMs);
member.poll(nextRequestTimeoutMs);
String pollDurationDescription = scheduledTick != null ? "for up to " + nextRequestTimeoutMs + "ms or " : "";
String stageDescription = "polling the group coordinator " + pollDurationDescription + "until interrupted";
member.poll(nextRequestTimeoutMs, () -> new TickThreadStage(stageDescription));
completeTickThreadStage();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
handleRebalanceCompleted();
} catch (WakeupException e) { // FIXME should not be WakeupException
@ -694,13 +714,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
boolean remains = configState.contains(connectorName);
log.info("Handling connector-only config update by {} connector {}",
remains ? "restarting" : "stopping", connectorName);
try (TickThreadStage stage = new TickThreadStage("stopping connector " + connectorName)) {
worker.stopAndAwaitConnector(connectorName);
}
// The update may be a deletion, so verify we actually need to restart the connector
if (remains) {
connectorsToStart.add(getConnectorStartingCallable(connectorName));
}
}
startAndStop(connectorsToStart);
String stageDescription = "restarting " + connectorsToStart.size() + " reconfigured connectors";
startAndStop(connectorsToStart, stageDescription);
}
private void processTargetStateChanges(Set<String> connectorTargetStateChanges) {
@ -758,7 +781,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
log.info("Handling task config update by stopping tasks {}, which will be restarted after rebalance if still assigned to this worker", tasksToStop);
try (TickThreadStage stage = new TickThreadStage("stopping " + tasksToStop.size() + " reconfigured tasks")) {
worker.stopAndAwaitTasks(tasksToStop);
}
tasksToRestart.addAll(tasksToStop);
}
@ -824,7 +849,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
callback.onCompletion(null, configState.connectors());
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -845,7 +870,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -865,7 +890,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -894,12 +919,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
} else {
log.trace("Removing connector config {} {}", connName, configState.connectors());
writeToConfigTopicAsLeader(() -> configBackingStore.removeConnectorConfig(connName));
writeToConfigTopicAsLeader(
"removing the config for connector " + connName + " from the config topic",
() -> configBackingStore.removeConnectorConfig(connName)
);
callback.onCompletion(null, new Created<>(false, null));
}
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -1058,7 +1086,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.trace("Submitting connector config write request {}", connName);
addRequest(
() -> {
validateConnectorConfig(config, (error, configInfos) -> {
validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> {
if (error != null) {
callback.onCompletion(error, null);
return;
@ -1085,7 +1113,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
writeToConfigTopicAsLeader(() -> configBackingStore.putConnectorConfig(connName, config, targetState));
writeToConfigTopicAsLeader(
"writing a config for connector " + connName + " to the config topic",
() -> configBackingStore.putConnectorConfig(connName, config, targetState)
);
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.
@ -1094,12 +1125,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
callback.onCompletion(null, new Created<>(!exists, info));
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
});
}));
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -1124,7 +1155,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// a non-empty set of task configs). A STOPPED connector with a non-empty set of tasks is less acceptable
// and likely to confuse users.
writeTaskConfigs(connName, Collections.emptyList());
String stageDescription = "writing the STOPPED target stage for connector " + connName + " to the config topic";
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
configBackingStore.putTargetState(connName, TargetState.STOPPED);
}
// Force a read of the new target state for the connector
if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
log.warn("Failed to read to end of config topic after writing the STOPPED target state for connector {}", connName);
@ -1133,7 +1167,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
callback.onCompletion(null, null);
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -1176,7 +1210,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -1199,7 +1233,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -1232,7 +1266,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), fenceUrl);
forwardRequestExecutor.execute(() -> {
try {
String stageDescription = "Forwarding zombie fencing request to the leader at " + workerUrl;
try (TemporaryStage stage = new TemporaryStage(stageDescription, callback, time)) {
restClient.httpRequest(fenceUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm);
}
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
@ -1261,7 +1298,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
addRequest(() -> {
doFenceZombieSourceTasks(connName, callback);
return null;
}, forwardErrorCallback(callback));
}, forwardErrorAndTickThreadStages(callback));
}
private void doFenceZombieSourceTasks(String connName, Callback<Void> callback) {
@ -1325,7 +1362,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.debug("Skipping zombie fencing round but writing task count record for connector {} "
+ "as both the most recent and the current generation of task configs only contain one task", connName);
}
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, taskCount));
writeToConfigTopicAsLeader(
"writing a task count record for connector " + connName + " to the config topic",
() -> configBackingStore.putTaskCountRecord(connName, taskCount)
);
}
callback.onCompletion(null, null);
}
@ -1351,7 +1391,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (assignment.connectors().contains(connName)) {
try {
try (TickThreadStage stage = new TickThreadStage("stopping restarted connector " + connName)) {
worker.stopAndAwaitConnector(connName);
}
startConnector(connName, callback);
} catch (Throwable t) {
callback.onCompletion(t, null);
@ -1363,7 +1405,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
return null;
},
forwardErrorCallback(callback));
forwardErrorAndTickThreadStages(callback));
}
@Override
@ -1384,7 +1426,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
if (assignment.tasks().contains(id)) {
try {
try (TickThreadStage stage = new TickThreadStage("restarting task " + id)) {
worker.stopAndAwaitTask(id);
if (startTask(id))
callback.onCompletion(null, null);
@ -1400,7 +1442,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
return null;
},
forwardErrorCallback(callback));
forwardErrorAndTickThreadStages(callback));
}
@Override
@ -1422,7 +1464,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
if (isLeader()) {
// Write a restart request to the config backing store, to be executed asynchronously in tick()
String stageDescription = "writing restart request for connector " + request.connectorName() + " to the config topic";
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
configBackingStore.putRestartRequest(request);
}
// Compute and send the response that this was accepted
Optional<RestartPlan> plan = buildRestartPlan(request);
if (!plan.isPresent()) {
@ -1435,7 +1480,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
}
@ -1455,7 +1500,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
pendingRestartRequests.clear();
}
restartRequests.forEach(restartRequest -> {
try {
String stageDescription = "handling restart request for connector " + restartRequest.connectorName();
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
doRestartConnectorAndTasks(restartRequest);
} catch (Exception e) {
log.warn("Unexpected error while trying to process " + restartRequest + ", the restart request will be skipped.", e);
@ -1488,12 +1534,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
final boolean restartConnector = plan.shouldRestartConnector() && currentAssignments.connectors().contains(connectorName);
final boolean restartTasks = !assignedIdsToRestart.isEmpty();
if (restartConnector) {
String stageDescription = "stopping to-be-restarted connector " + connectorName;
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
worker.stopAndAwaitConnector(connectorName);
}
onRestart(connectorName);
}
if (restartTasks) {
String stageDescription = "stopping " + assignedIdsToRestart.size() + " to-be-restarted tasks for connector " + connectorName;
// Stop the tasks and mark as restarting
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
worker.stopAndAwaitTasks(assignedIdsToRestart);
}
assignedIdsToRestart.forEach(this::onRestart);
}
@ -1538,7 +1590,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
super.connectorOffsets(connName, cb);
return null;
},
forwardErrorCallback(cb)
forwardErrorAndTickThreadStages(cb)
);
}
@ -1563,18 +1615,22 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// We need to ensure that we perform the necessary checks again before proceeding to actually altering / resetting the connector offsets since
// zombie fencing is done asynchronously and the conditions could have changed since the previous check
addRequest(() -> {
try (TickThreadStage stage = new TickThreadStage("modifying offsets for connector " + connName)) {
if (modifyConnectorOffsetsChecks(connName, callback)) {
worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
}
}
return null;
}, forwardErrorCallback(callback));
}, forwardErrorAndTickThreadStages(callback));
}
});
} else {
try (TickThreadStage stage = new TickThreadStage("modifying offsets for connector " + connName)) {
worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
}
}
return null;
}, forwardErrorCallback(callback));
}, forwardErrorAndTickThreadStages(callback));
}
/**
@ -1641,11 +1697,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
* Note that it is not necessary to wrap every write to the config topic in this method, only the writes that should be performed
* exclusively by the leader. For example, {@link ConfigBackingStore#putTargetState(String, TargetState)} does not require this
* method, as it can be invoked by any worker in the cluster.
* @param description a description of the action; e.g., "writing 5 task configs for connector file-source"
* @param write the action that writes to the config topic, such as {@link ConfigBackingStore#putSessionKey(SessionKey)} or
* {@link ConfigBackingStore#putConnectorConfig(String, Map, TargetState)}.
*/
private void writeToConfigTopicAsLeader(Runnable write) {
try {
private void writeToConfigTopicAsLeader(String description, Runnable write) {
try (TickThreadStage stage = new TickThreadStage(description)) {
write.run();
} catch (PrivilegedWriteException e) {
log.warn("Failed to write to config topic as leader; will rejoin group if necessary and, if still leader, attempt to reclaim write privileges for the config topic", e);
@ -1782,7 +1839,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
* @return true if successful; false if timed out
*/
private boolean refreshConfigSnapshot(long timeoutMs) {
try {
try (TickThreadStage stage = new TickThreadStage("reading to the end of the config topic")) {
configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
configState = configBackingStore.snapshot();
log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset());
@ -1796,14 +1853,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private void backoff(long ms) {
if (ConnectProtocolCompatibility.fromProtocolVersion(currentProtocolVersion) == EAGER) {
String stageDescription = "sleeping for a backoff duration of " + ms + "ms";
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
time.sleep(ms);
}
return;
}
if (backoffRetries > 0) {
int rebalanceDelayFraction =
config.getInt(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG) / 10 / backoffRetries;
String stageDescription = "sleeping for a backoff duration of " + rebalanceDelayFraction + "ms";
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
time.sleep(rebalanceDelayFraction);
}
--backoffRetries;
return;
}
@ -1820,8 +1883,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
// Visible for testing
void startAndStop(Collection<Callable<Void>> callables) {
try {
void startAndStop(Collection<Callable<Void>> callables, String stageDescription) {
if (callables.isEmpty())
return;
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
startAndStopExecutor.invokeAll(callables);
} catch (InterruptedException e) {
// ignore
@ -1864,7 +1930,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
}
startAndStop(callables);
String stageDescription = "starting " + callables.size() + " connector(s) and task(s) after a rebalance";
startAndStop(callables, stageDescription);
synchronized (this) {
runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0
@ -1982,12 +2049,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
callback.onCompletion(null, null);
return null;
},
forwardErrorCallback(callback)
forwardErrorAndTickThreadStages(callback)
);
} else {
callback.onCompletion(null, null);
}
};
callback.recordStage(new Stage("starting the connector", time.milliseconds()));
worker.startConnector(connectorName, configProps, ctx, this, initialState, onInitialStateChange);
}
@ -2101,8 +2169,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
} else {
connConfig = new SourceConnectorConfig(plugins(), configs, worker.isTopicCreationEnabled());
}
final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
final List<Map<String, String>> taskProps;
try (TickThreadStage stage = new TickThreadStage("generating task configs for connector " + connName)) {
taskProps = worker.connectorTaskConfigs(connName, connConfig);
}
publishConnectorTaskConfigs(connName, taskProps, cb);
} catch (Throwable t) {
cb.onCompletion(t, null);
@ -2144,7 +2214,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
.build()
.toString();
log.trace("Forwarding task configurations for connector {} to leader", connName);
String stageDescription = "Forwarding task configurations to the leader at " + leaderUrl;
try (TemporaryStage stage = new TemporaryStage(stageDescription, cb, time)) {
restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
}
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);
@ -2160,7 +2233,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
throw new BadRequestException("Cannot submit non-empty set of task configs for stopped connector " + connName);
}
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, taskConfigs));
writeToConfigTopicAsLeader(
"writing " + taskConfigs.size() + " task configs for connector " + connName + " to the config topic",
() -> configBackingStore.putTaskConfigs(connName, taskConfigs)
);
}
// Invoked by exactly-once worker source tasks after they have successfully initialized their transactional
@ -2175,7 +2251,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
addRequest(
() -> verifyTaskGenerationAndOwnership(id, initialTaskGen, verifyCallback),
forwardErrorCallback(verifyCallback)
forwardErrorAndTickThreadStages(verifyCallback)
);
try {
@ -2248,6 +2324,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// queue was added
if (peekWithoutException() == req)
member.wakeup();
callback.recordStage(tickThreadStage);
return req;
}
@ -2257,6 +2334,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
} finally {
currentRequest = null;
}
}
@ -2439,11 +2518,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
}
private static Callback<Void> forwardErrorCallback(final Callback<?> callback) {
return (error, result) -> {
private static Callback<Void> forwardErrorAndTickThreadStages(final Callback<?> callback) {
Callback<Void> cb = callback.chainStaging((error, result) -> {
if (error != null)
callback.onCompletion(error, null);
};
});
return cb;
}
private void updateDeletedConnectorStatus() {
@ -2574,7 +2654,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// The actual timeout for graceful task/connector stop is applied in worker's
// stopAndAwaitTask/stopAndAwaitConnector methods.
startAndStop(callables);
String stageDescription = "stopping " + connectors.size() + " and " + tasks.size() + " tasks";
startAndStop(callables, stageDescription);
log.info("Finished stopping tasks in preparation for rebalance");
synchronized (DistributedHerder.this) {
@ -2593,7 +2674,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// Ensure that all status updates have been pushed to the storage system before rebalancing.
// Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
// completes.
try (TickThreadStage stage = new TickThreadStage("flushing updates to the status topic")) {
statusBackingStore.flush();
}
log.info("Finished flushing status backing store in preparation for rebalance");
} else {
log.info("Wasn't able to resume work after last rebalance, can skip stopping connectors and tasks");
@ -2601,6 +2684,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
private void resetActiveTopics(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
String stageDescription = "resetting the list of active topics for " + connectors.size() + " and " + tasks.size() + " tasks";
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
connectors.stream()
.filter(connectorName -> !configState.contains(connectorName))
.forEach(DistributedHerder.this::resetConnectorActiveTopics);
@ -2611,6 +2696,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
.forEach(DistributedHerder.this::resetConnectorActiveTopics);
}
}
}
private boolean isSourceConnector(String connName) {
return ConnectorType.SOURCE.equals(connectorType(configState.connectorConfig(connName)));
@ -2666,6 +2752,43 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
return result;
}
private synchronized void recordTickThreadStage(String description) {
assert description != null;
log.trace("Recording new tick thread stage: {}", description);
// Preserve the current stage to report to requests submitted after this method is invoked
this.tickThreadStage = new Stage(description, time.milliseconds());
// Report the current stage to all pending requests
requests.forEach(request -> request.callback().recordStage(tickThreadStage));
// And, if there is an in-progress request, report the stage to it as well
Optional.ofNullable(currentRequest).map(DistributedHerderRequest::callback)
.ifPresent(callback -> callback.recordStage(tickThreadStage));
}
private synchronized void completeTickThreadStage() {
// This is expected behavior with nested stages; can just no-op
if (tickThreadStage == null)
return;
log.trace("Completing current tick thread stage; was {}", tickThreadStage);
tickThreadStage.complete(time.milliseconds());
tickThreadStage = null;
}
private class TickThreadStage implements UncheckedCloseable {
public TickThreadStage(String description) {
if (description != null)
recordTickThreadStage(description);
}
@Override
public void close() {
completeTickThreadStage();
}
}
/**
* Represents an active zombie fencing: that is, an in-progress attempt to invoke
* {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count
@ -2699,6 +2822,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (fencingFuture != null) {
throw new IllegalStateException("Cannot invoke start() multiple times");
}
String stageDescription = "initiating a round of zombie fencing for connector " + connName;
try (TickThreadStage stage = new TickThreadStage(stageDescription)) {
fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> {
// This callback will be called on the same thread that invokes KafkaFuture::thenApply if
// the future is already completed. Since that thread is the herder tick thread, we don't need
@ -2711,6 +2836,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
awaitFollowup();
return null;
});
}
// Immediately after the fencing and necessary followup work (i.e., writing the task count record to the config topic)
// is complete, remove this from the list of active fencings
addCallback((ignored, error) -> {
@ -2736,7 +2862,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (fencingFollowup.isDone()) {
return null;
}
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, tasksToRecord));
writeToConfigTopicAsLeader(
"writing a task count record for connector " + connName + " to the config topic",
() -> configBackingStore.putTaskCountRecord(connName, tasksToRecord)
);
return null;
}

View File

@ -42,11 +42,14 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.common.utils.Utils.UncheckedCloseable;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
/**
* This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments
* to workers.
@ -120,7 +123,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
return super.ensureCoordinatorReady(timer);
}
public void poll(long timeout) {
public void poll(long timeout, Supplier<UncheckedCloseable> onPoll) {
// poll for io until the timeout expires
final long start = time.milliseconds();
long now = start;
@ -130,7 +133,11 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
if (coordinatorUnknown()) {
log.debug("Broker coordinator is marked unknown. Attempting discovery with a timeout of {}ms",
coordinatorDiscoveryTimeoutMs);
if (ensureCoordinatorReady(time.timer(coordinatorDiscoveryTimeoutMs))) {
boolean coordinatorReady;
try (UncheckedCloseable polling = onPoll.get()) {
coordinatorReady = ensureCoordinatorReady(time.timer(coordinatorDiscoveryTimeoutMs));
}
if (coordinatorReady) {
log.debug("Broker coordinator is ready");
} else {
log.debug("Can not connect to broker coordinator");
@ -146,7 +153,9 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
}
if (rejoinNeededOrPending()) {
try (UncheckedCloseable polling = onPoll.get()) {
ensureActiveGroup();
}
now = time.milliseconds();
}
@ -158,7 +167,9 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
// Note that because the network client is shared with the background heartbeat thread,
// we do not want to block in poll longer than the time to the next heartbeat.
long pollTimeout = Math.min(Math.max(0, remaining), timeToNextHeartbeat(now));
try (UncheckedCloseable polling = onPoll.get()) {
client.poll(time.timer(pollTimeout));
}
now = time.milliseconds();
elapsed = now - start;

View File

@ -48,6 +48,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.apache.kafka.common.utils.Utils.UncheckedCloseable;
/**
* This class manages the coordination process with brokers for the Connect cluster group membership. It ties together
@ -158,14 +161,14 @@ public class WorkerGroupMember {
* Ensure that the connection to the broker coordinator is up and that the worker is an
* active member of the group.
*/
public void ensureActive() {
coordinator.poll(0);
public void ensureActive(Supplier<UncheckedCloseable> onPoll) {
coordinator.poll(0, onPoll);
}
public void poll(long timeout) {
public void poll(long timeout, Supplier<UncheckedCloseable> onPoll) {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
coordinator.poll(timeout);
coordinator.poll(timeout, onPoll);
}
/**

View File

@ -21,12 +21,15 @@ import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.Stage;
import org.apache.kafka.connect.util.StagedTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -38,7 +41,7 @@ public class HerderRequestHandler {
private final RestClient restClient;
private long requestTimeoutMs;
private volatile long requestTimeoutMs;
public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
this.restClient = restClient;
@ -64,6 +67,22 @@ public class HerderRequestHandler {
return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw e.getCause();
} catch (StagedTimeoutException e) {
String message;
Stage stage = e.stage();
if (stage.completed() != null) {
message = "Request timed out. The last operation the worker completed was "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started()) + " and completed at "
+ Instant.ofEpochMilli(stage.completed());
} else {
message = "Request timed out. The worker is currently "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started());
}
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
// error is the best option
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), message);
} catch (TimeoutException e) {
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
// error is the best option

View File

@ -27,6 +27,8 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.Stage;
import org.apache.kafka.connect.util.StagedTimeoutException;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
@ -39,6 +41,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -107,6 +110,22 @@ public class ConnectorPluginsResource implements ConnectResource {
try {
return validationCallback.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (StagedTimeoutException e) {
Stage stage = e.stage();
String message;
if (stage.completed() != null) {
message = "Request timed out. The last operation the worker completed was "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started()) + " and completed at "
+ Instant.ofEpochMilli(stage.completed());
} else {
message = "Request timed out. The worker is currently "
+ stage.description() + ", which began at "
+ Instant.ofEpochMilli(stage.started());
}
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
// error is the best option
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), message);
} catch (TimeoutException e) {
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
// error is the best option

View File

@ -27,4 +27,22 @@ public interface Callback<V> {
* @param result the return value, or null if the operation failed
*/
void onCompletion(Throwable error, V result);
default void recordStage(Stage stage) {
}
default <V2> Callback<V2> chainStaging(Callback<V2> chained) {
return new Callback<V2>() {
@Override
public void recordStage(Stage stage) {
Callback.this.recordStage(stage);
}
@Override
public void onCompletion(Throwable error, V2 result) {
chained.onCompletion(error, result);
}
};
}
}

View File

@ -40,6 +40,7 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
private volatile T result = null;
private volatile Throwable exception = null;
private volatile boolean cancelled = false;
private volatile Stage currentStage = null;
public ConvertingFutureCallback() {
this(null);
@ -110,11 +111,22 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
@Override
public T get(long l, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
if (!finishedLatch.await(l, timeUnit))
throw new TimeoutException("Timed out waiting for future");
if (!finishedLatch.await(l, timeUnit)) {
Stage stage = currentStage;
if (stage != null) {
throw new StagedTimeoutException(stage);
} else {
throw new TimeoutException();
}
}
return result();
}
@Override
public void recordStage(Stage stage) {
this.currentStage = stage;
}
private T result() throws ExecutionException {
if (cancelled) {
throw new CancellationException();

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Stage {
private static final Logger log = LoggerFactory.getLogger(Stage.class);
private final String description;
private final long started;
private volatile Long completed;
public Stage(String description, long started) {
this.description = description;
this.started = started;
this.completed = null;
}
public String description() {
return description;
}
public long started() {
return started;
}
public Long completed() {
return completed;
}
public synchronized void complete(long time) {
if (time < started) {
log.warn("Ignoring invalid completion time {} since it is before this stage's start time of {}", time, started);
return;
}
if (completed != null) {
log.warn("Ignoring completion time of {} since this stage was already completed at {}", time, completed);
return;
}
this.completed = time;
}
@Override
public String toString() {
return description + "(started " + started + ", completed=" + completed() + ")";
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
public class StagedTimeoutException extends TimeoutException {
private final Stage stage;
public StagedTimeoutException(Stage stage) {
super();
this.stage = Objects.requireNonNull(stage, "Stage may not be null");
}
public Stage stage() {
return stage;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.apache.kafka.common.utils.Time;
public class TemporaryStage implements AutoCloseable {
private final Stage stage;
private final Time time;
public TemporaryStage(String description, Callback<?> callback, Time time) {
this.stage = new Stage(description, time.milliseconds());
this.time = time;
callback.recordStage(stage);
}
@Override
public void close() {
stage.complete(time.milliseconds());
}
}

View File

@ -133,6 +133,11 @@ public class BlockingConnectorTest {
NUM_WORKERS,
"Initial group of workers did not start in time"
);
try (Response response = connect.requestGet(connect.endpointForResource("connectors/nonexistent"))) {
// hack: make sure the worker is actually up (has joined the cluster, created and read to the end of internal topics, etc.)
assertEquals(404, response.getStatus());
}
}
@After
@ -145,7 +150,11 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorValidate() throws Exception {
log.info("Starting test testBlockInConnectorValidate");
assertRequestTimesOut("create connector that blocks during validation", () -> createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE));
assertRequestTimesOut(
"create connector that blocks during validation",
() -> createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE),
"The worker is currently performing multi-property validation for the connector"
);
// Will NOT assert that connector has failed, since the request should fail before it's even created
// Connector should already be blocked so this should return immediately, but check just to
@ -159,7 +168,11 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorConfig() throws Exception {
log.info("Starting test testBlockInConnectorConfig");
assertRequestTimesOut("create connector that blocks while getting config", () -> createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG));
assertRequestTimesOut(
"create connector that blocks while getting config",
() -> createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG),
"The worker is currently retrieving the configuration definition from the connector"
);
// Will NOT assert that connector has failed, since the request should fail before it's even created
// Connector should already be blocked so this should return immediately, but check just to
@ -178,6 +191,13 @@ public class BlockingConnectorTest {
createNormalConnector();
verifyNormalConnector();
// Try to restart the connector
assertRequestTimesOut(
"restart connector that blocks during initialize",
() -> connect.restartConnector(BLOCKING_CONNECTOR_NAME),
"The worker is currently starting the connector"
);
}
@Test
@ -188,6 +208,13 @@ public class BlockingConnectorTest {
createNormalConnector();
verifyNormalConnector();
// Try to restart the connector
assertRequestTimesOut(
"restart connector that blocks during start",
() -> connect.restartConnector(BLOCKING_CONNECTOR_NAME),
"The worker is currently starting the connector"
);
}
@Test
@ -329,7 +356,7 @@ public class BlockingConnectorTest {
normalConnectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
}
private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request) {
private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request, String expectedTimeoutMessage) {
// Artificially reduce the REST request timeout so that these don't take 90 seconds
connect.requestTimeout(REDUCED_REST_REQUEST_TIMEOUT);
ConnectRestException exception = assertThrows(
@ -345,6 +372,12 @@ public class BlockingConnectorTest {
+ "; instead, message was: " + exception.getMessage(),
exception.getMessage().contains("Request timed out")
);
if (expectedTimeoutMessage != null) {
assertTrue(
"Timeout error message '" + exception.getMessage() + "' does not match expected format",
exception.getMessage().contains(expectedTimeoutMessage)
);
}
// Reset the REST request timeout so that other requests aren't impacted
connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
@ -44,7 +45,9 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
@ -58,11 +61,16 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.resources.ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@ -771,6 +779,111 @@ public class ConnectWorkerIntegrationTest {
return props;
}
@Test
public void testRequestTimeouts() throws Exception {
final String configTopic = "test-request-timeout-configs";
workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to
// be spuriously triggered after the group coordinator for a Connect cluster is bounced
workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
connect = connectBuilder
.numBrokers(1)
.numWorkers(1)
.build();
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(1,
"Worker did not start in time");
Map<String, String> connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME);
Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1);
connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1));
// Create a connector to ensure that the worker has completed startup
log.info("Creating initial connector");
connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time"
);
// Bring down Kafka, which should cause some REST requests to fail
log.info("Stopping Kafka cluster");
connect.kafka().stopOnlyKafka();
// Try to reconfigure the connector, which should fail with a timeout error
log.info("Trying to reconfigure connector while Kafka cluster is down");
assertTimeoutException(
() -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2),
"flushing updates to the status topic"
);
log.info("Restarting Kafka cluster");
connect.kafka().startOnlyKafkaOnSamePorts();
connect.assertions().assertExactlyNumBrokersAreUp(1, "Broker did not complete startup in time");
log.info("Kafka cluster is restarted");
// Reconfigure the connector to ensure that the broker has completed startup
log.info("Reconfiguring connector with one more task");
connect.configureConnector(CONNECTOR_NAME, connectorConfig2);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME, NUM_TASKS + 1, "connector and tasks did not start in time"
);
// Delete the config topic--WCGW?
log.info("Deleting Kafka Connect config topic");
connect.kafka().deleteTopic(configTopic);
// Try to delete the connector, which should fail with a slightly-different timeout error
log.info("Trying to reconfigure connector after config topic has been deleted");
assertTimeoutException(
() -> connect.deleteConnector(CONNECTOR_NAME),
"removing the config for connector " + CONNECTOR_NAME + " from the config topic"
);
// The worker should still be blocked on the same operation
log.info("Trying again to reconfigure connector after config topic has been deleted");
assertTimeoutException(
() -> connect.configureConnector(CONNECTOR_NAME, connectorConfig1),
"removing the config for connector " + CONNECTOR_NAME + " from the config topic"
);
}
private void assertTimeoutException(Runnable operation, String expectedStageDescription) throws InterruptedException {
connect.requestTimeout(1_000);
AtomicReference<Throwable> latestError = new AtomicReference<>();
waitForCondition(
() -> {
try {
operation.run();
latestError.set(null);
return false;
} catch (Throwable t) {
latestError.set(t);
assertTrue(t instanceof ConnectRestException);
ConnectRestException restException = (ConnectRestException) t;
assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), restException.statusCode());
assertNotNull(restException.getMessage());
assertTrue(
"Message '" + restException.getMessage() + "' does not match expected format",
restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription)
);
return true;
}
},
30_000,
() -> {
String baseMessage = "REST request did not time out with expected error message in time. ";
Throwable t = latestError.get();
if (t == null) {
return baseMessage + "The most recent request did not fail.";
} else {
return baseMessage + "Most recent error: " + t;
}
}
);
connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector
Map<String, String> props = new HashMap<>();

View File

@ -360,7 +360,7 @@ public class AbstractHerderTest {
config.put("required", "value");
config.put("testKey", null);
final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
final ConfigInfos configInfos = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(1, configInfos.errorCount());
assertErrorForKey(configInfos, "testKey");
@ -378,7 +378,7 @@ public class AbstractHerderTest {
config.put("testKey", null);
config.put("secondTestKey", null);
final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
final ConfigInfos configInfos = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(2, configInfos.errorCount());
assertErrorForKey(configInfos, "testKey");
@ -448,7 +448,7 @@ public class AbstractHerderTest {
public void testConfigValidationEmptyConfig() {
AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy, 0);
assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), false));
assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), s -> null, false));
verify(transformer).transform(Collections.emptyMap());
}
@ -458,7 +458,7 @@ public class AbstractHerderTest {
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
ConfigInfos result = herder.validateConnectorConfig(config, false);
ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
@ -493,7 +493,7 @@ public class AbstractHerderTest {
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
ConfigInfos validation = herder.validateConnectorConfig(config, false);
ConfigInfos validation = herder.validateConnectorConfig(config, s -> null, false);
ConfigInfo topicsListInfo = findInfo(validation, SinkConnectorConfig.TOPICS_CONFIG);
assertNotNull(topicsListInfo);
@ -516,7 +516,7 @@ public class AbstractHerderTest {
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1");
config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
ConfigInfos validation = herder.validateConnectorConfig(config, false);
ConfigInfos validation = herder.validateConnectorConfig(config, s -> null, false);
ConfigInfo topicsListInfo = findInfo(validation, SinkConnectorConfig.TOPICS_CONFIG);
assertNotNull(topicsListInfo);
@ -535,7 +535,7 @@ public class AbstractHerderTest {
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
ConfigInfos validation = herder.validateConnectorConfig(config, false);
ConfigInfos validation = herder.validateConnectorConfig(config, s -> null, false);
ConfigInfo topicsRegexInfo = findInfo(validation, SinkConnectorConfig.TOPICS_REGEX_CONFIG);
assertNotNull(topicsRegexInfo);
@ -560,7 +560,7 @@ public class AbstractHerderTest {
config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB");
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config, false);
ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
@ -616,7 +616,7 @@ public class AbstractHerderTest {
config.put(ConnectorConfig.PREDICATES_CONFIG + ".predX.type", SamplePredicate.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config, false);
ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
@ -682,7 +682,7 @@ public class AbstractHerderTest {
config.put(ackConfigKey, "none");
config.put(saslConfigKey, "jaas_config");
ConfigInfos result = herder.validateConnectorConfig(config, false);
ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
// We expect there to be errors due to now allowed override policy for ACKS.... Note that these assertions depend heavily on
@ -741,7 +741,7 @@ public class AbstractHerderTest {
overriddenClientConfigs.add(bootstrapServersConfigKey);
overriddenClientConfigs.add(loginCallbackHandlerConfigKey);
ConfigInfos result = herder.validateConnectorConfig(config, false);
ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Map<String, String> validatedOverriddenClientConfigs = new HashMap<>();

View File

@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.waitForCondition;
@ -302,20 +303,16 @@ public class ConnectAssertions {
*/
public void assertConnectorAndAtLeastNumTasksAreRunning(String connectorName, int numTasks, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
waitForConnectorState(
connectorName,
AbstractStatus.State.RUNNING,
numTasks,
atLeast(numTasks),
null,
AbstractStatus.State.RUNNING,
(actual, expected) -> actual >= expected
).orElse(false),
CONNECTOR_SETUP_DURATION_MS,
"The connector or at least " + numTasks + " of tasks are not running.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
"The connector or at least " + numTasks + " of tasks are not running.",
detailMessage,
CONNECTOR_SETUP_DURATION_MS
);
}
/**
@ -329,20 +326,16 @@ public class ConnectAssertions {
*/
public void assertConnectorAndExactlyNumTasksAreRunning(String connectorName, int numTasks, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
waitForConnectorState(
connectorName,
AbstractStatus.State.RUNNING,
numTasks,
exactly(numTasks),
null,
AbstractStatus.State.RUNNING,
(actual, expected) -> actual == expected
).orElse(false),
CONNECTOR_SETUP_DURATION_MS,
"The connector or exactly " + numTasks + " tasks are not running.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
"The connector or exactly " + numTasks + " tasks are not running.",
detailMessage,
CONNECTOR_SETUP_DURATION_MS
);
}
/**
@ -356,20 +349,16 @@ public class ConnectAssertions {
*/
public void assertConnectorAndExactlyNumTasksArePaused(String connectorName, int numTasks, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
waitForConnectorState(
connectorName,
AbstractStatus.State.PAUSED,
numTasks,
exactly(numTasks),
null,
AbstractStatus.State.PAUSED,
Integer::equals
).orElse(false),
CONNECTOR_SHUTDOWN_DURATION_MS,
"The connector or exactly " + numTasks + " tasks are not paused.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
"The connector or exactly " + numTasks + " tasks are not paused.",
detailMessage,
CONNECTOR_SHUTDOWN_DURATION_MS
);
}
/**
@ -383,24 +372,20 @@ public class ConnectAssertions {
*/
public void assertConnectorIsRunningAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
waitForConnectorState(
connectorName,
AbstractStatus.State.RUNNING,
numTasks,
exactly(numTasks),
null,
AbstractStatus.State.FAILED,
(actual, expected) -> actual >= expected
).orElse(false),
CONNECTOR_SETUP_DURATION_MS,
"Either the connector is not running or not all the " + numTasks + " tasks have failed.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
"Either the connector is not running or not all the " + numTasks + " tasks have failed.",
detailMessage,
CONNECTOR_SETUP_DURATION_MS
);
}
/**
* Assert that a connector is running, that it has a specific number of tasks out of that numFailedTasks are in the FAILED state.
* Assert that a connector is running, that it has a specific number of tasks, and out of those, numFailedTasks are in the FAILED state.
*
* @param connectorName the connector name
* @param numTasks the number of tasks
@ -410,21 +395,16 @@ public class ConnectAssertions {
*/
public void assertConnectorIsRunningAndNumTasksHaveFailed(String connectorName, int numTasks, int numFailedTasks, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
waitForConnectorState(
connectorName,
AbstractStatus.State.RUNNING,
numTasks,
exactly(numTasks),
numFailedTasks,
AbstractStatus.State.FAILED,
(actual, expected) -> actual >= expected
).orElse(false),
CONNECTOR_SETUP_DURATION_MS,
"Either the connector is not running or not all the " + numTasks + " tasks have failed.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
"Either the connector is not running or not all the " + numTasks + " tasks have failed.",
detailMessage,
CONNECTOR_SETUP_DURATION_MS
);
}
/**
@ -438,20 +418,16 @@ public class ConnectAssertions {
*/
public void assertConnectorIsFailedAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
waitForConnectorState(
connectorName,
AbstractStatus.State.FAILED,
numTasks,
exactly(numTasks),
null,
AbstractStatus.State.FAILED,
(actual, expected) -> actual >= expected
).orElse(false),
CONNECTOR_SETUP_DURATION_MS,
"Either the connector is running or not all the " + numTasks + " tasks have failed.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
"Either the connector is running or not all the " + numTasks + " tasks have failed.",
detailMessage,
CONNECTOR_SETUP_DURATION_MS
);
}
/**
@ -500,82 +476,84 @@ public class ConnectAssertions {
*/
public void assertConnectorIsStopped(String connectorName, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
waitForConnectorState(
connectorName,
AbstractStatus.State.STOPPED,
0,
exactly(0),
null,
Integer::equals
).orElse(false),
CONNECTOR_SHUTDOWN_DURATION_MS,
"At least the connector or one of its tasks is still running");
null,
"At least the connector or one of its tasks is still running",
detailMessage,
CONNECTOR_SHUTDOWN_DURATION_MS
);
}
/**
* Check whether the given connector state matches the current state of the connector and
* whether it has at least the given number of tasks, with some number of tasks matching the given
* task state.
* @param connectorName the connector
* @param connectorState
* @param expectedNumTasks the expected number of tasks
* @param tasksState
*/
protected void waitForConnectorState(
String connectorName,
AbstractStatus.State connectorState,
Predicate<Integer> expectedNumTasks,
Integer numTasksInTasksState,
AbstractStatus.State tasksState,
String conditionMessage,
String detailMessage,
long maxWaitMs
) throws InterruptedException {
AtomicReference<ConnectorStateInfo> lastInfo = new AtomicReference<>();
AtomicReference<Exception> lastInfoError = new AtomicReference<>();
try {
waitForCondition(
() -> {
try {
ConnectorStateInfo info = connect.connectorStatus(connectorName);
lastInfo.set(info);
lastInfoError.set(null);
if (info == null)
return false;
int numTasks = info.tasks().size();
int expectedTasksInState = Optional.ofNullable(numTasksInTasksState).orElse(numTasks);
return expectedNumTasks.test(info.tasks().size())
&& info.connector().state().equals(connectorState.toString())
&& info.tasks().stream().filter(s -> s.state().equals(tasksState.toString())).count() == expectedTasksInState;
} catch (Exception e) {
log.error("Could not check connector state info.", e);
lastInfo.set(null);
lastInfoError.set(e);
return false;
}
},
maxWaitMs,
() -> {
String result = conditionMessage;
if (lastInfo.get() != null) {
return result + " When last checked, " + stateSummary(lastInfo.get());
} else if (lastInfoError.get() != null) {
result += " The last attempt to check the connector state failed: " + lastInfoError.get().getClass();
String exceptionMessage = lastInfoError.get().getMessage();
if (exceptionMessage != null) {
result += ": " + exceptionMessage;
}
return result;
} else {
return result;
}
}
);
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
}
/**
* Check whether the given connector state matches the current state of the connector and
* whether it has at least the given number of tasks, with all the tasks matching the given
* task state.
* @param connectorName the connector
* @param connectorState
* @param numTasks the expected number of tasks
* @param tasksState
* @return true if the connector and tasks are in the expected state; false otherwise
*/
protected Optional<Boolean> checkConnectorState(
String connectorName,
AbstractStatus.State connectorState,
int numTasks,
AbstractStatus.State tasksState,
BiFunction<Integer, Integer, Boolean> comp
) {
try {
ConnectorStateInfo info = connect.connectorStatus(connectorName);
boolean result = info != null
&& comp.apply(info.tasks().size(), numTasks)
&& info.connector().state().equals(connectorState.toString())
&& info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString()));
return Optional.of(result);
} catch (Exception e) {
log.error("Could not check connector state info.", e);
return Optional.empty();
}
}
/**
* Check whether the given connector state matches the current state of the connector and
* whether it has at least the given number of tasks, with numTasksInTasksState matching the given
* task state.
* @param connectorName the connector
* @param connectorState
* @param numTasks the expected number of tasks
* @param tasksState
* @return true if the connector and tasks are in the expected state; false otherwise
*/
protected Optional<Boolean> checkConnectorState(
String connectorName,
AbstractStatus.State connectorState,
int numTasks,
int numTasksInTasksState,
AbstractStatus.State tasksState,
BiFunction<Integer, Integer, Boolean> comp
) {
try {
ConnectorStateInfo info = connect.connectorStatus(connectorName);
boolean result = info != null
&& comp.apply(info.tasks().size(), numTasks)
&& info.connector().state().equals(connectorState.toString())
&& info.tasks().stream().filter(s -> s.state().equals(tasksState.toString())).count() == numTasksInTasksState;
return Optional.of(result);
} catch (Exception e) {
log.error("Could not check connector state info.", e);
return Optional.empty();
}
}
/**
* Assert that a connector's set of active topics matches the given collection of topic names.
*
@ -615,4 +593,24 @@ public class ConnectAssertions {
return Optional.empty();
}
}
private static String stateSummary(ConnectorStateInfo info) {
String result = "the connector was " + info.connector().state();
if (info.tasks().isEmpty()) {
return result + " and no tasks were running";
} else {
String taskStates = info.tasks().stream()
.map(ConnectorStateInfo.TaskState::state)
.collect(Collectors.joining(", "));
return result + " and its tasks were in these states: " + taskStates;
}
}
private static Predicate<Integer> exactly(int expected) {
return actual -> actual == expected;
}
private static Predicate<Integer> atLeast(int expected) {
return actual -> actual >= expected;
}
}