diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 51f99198de9..d84d45abded 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -159,7 +159,7 @@ files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/> + files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/> tempConnectors = new ConcurrentHashMap<>(); @@ -143,6 +147,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con this.configBackingStore = configBackingStore; this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; this.connectorExecutor = Executors.newCachedThreadPool(); + this.time = time; this.loggers = new Loggers(time); } @@ -394,9 +399,17 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @Override public void validateConnectorConfig(Map connectorProps, Callback callback, boolean doLog) { + Stage waitingForThread = new Stage( + "waiting for a new thread to become available for connector validation", + time.milliseconds() + ); + callback.recordStage(waitingForThread); connectorExecutor.submit(() -> { + waitingForThread.complete(time.milliseconds()); try { - ConfigInfos result = validateConnectorConfig(connectorProps, doLog); + Function reportStage = description -> + new TemporaryStage(description, callback, time); + ConfigInfos result = validateConnectorConfig(connectorProps, reportStage, doLog); callback.onCompletion(null, result); } catch (Throwable t) { callback.onCompletion(t, null); @@ -468,9 +481,17 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con || SinkConnectorConfig.hasDlqTopicConfig(connProps); } - ConfigInfos validateConnectorConfig(Map connectorProps, boolean doLog) { + ConfigInfos validateConnectorConfig( + Map connectorProps, + Function reportStage, + boolean doLog + ) { + String stageDescription; if (worker.configTransformer() != null) { - connectorProps = worker.configTransformer().transform(connectorProps); + stageDescription = "resolving transformed configuration properties for the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + connectorProps = worker.configTransformer().transform(connectorProps); + } } String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); if (connType == null) @@ -485,11 +506,17 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con if (connector instanceof SourceConnector) { connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE; enrichedConfigDef = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.configDef(), connectorProps, false); - validatedConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, connectorProps); + stageDescription = "validating source connector-specific properties for the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + validatedConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, connectorProps); + } } else { connectorType = org.apache.kafka.connect.health.ConnectorType.SINK; enrichedConfigDef = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.configDef(), connectorProps, false); - validatedConnectorConfig = validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, connectorProps); + stageDescription = "validating sink connector-specific properties for the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + validatedConnectorConfig = validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, connectorProps); + } } connectorProps.entrySet().stream() @@ -505,7 +532,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con Set allGroups = new LinkedHashSet<>(enrichedConfigDef.groups()); // do custom connector-specific validation - ConfigDef configDef = connector.config(); + ConfigDef configDef; + stageDescription = "retrieving the configuration definition from the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + configDef = connector.config(); + } if (null == configDef) { throw new BadRequestException( String.format( @@ -514,7 +545,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con ) ); } - Config config = connector.validate(connectorProps); + + Config config; + stageDescription = "performing multi-property validation for the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + config = connector.validate(connectorProps); + } if (null == config) { throw new BadRequestException( String.format( @@ -535,37 +571,46 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con ConfigInfos adminConfigInfos = null; if (connectorUsesProducer(connectorType, connectorProps)) { - producerConfigInfos = validateClientOverrides( - connName, - ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, - connectorConfig, - ProducerConfig.configDef(), - connector.getClass(), - connectorType, - ConnectorClientConfigRequest.ClientType.PRODUCER, - connectorClientConfigOverridePolicy); + stageDescription = "validating producer config overrides for the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + producerConfigInfos = validateClientOverrides( + connName, + ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, + connectorConfig, + ProducerConfig.configDef(), + connector.getClass(), + connectorType, + ConnectorClientConfigRequest.ClientType.PRODUCER, + connectorClientConfigOverridePolicy); + } } if (connectorUsesAdmin(connectorType, connectorProps)) { - adminConfigInfos = validateClientOverrides( - connName, - ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, - connectorConfig, - AdminClientConfig.configDef(), - connector.getClass(), - connectorType, - ConnectorClientConfigRequest.ClientType.ADMIN, - connectorClientConfigOverridePolicy); + stageDescription = "validating admin config overrides for the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + adminConfigInfos = validateClientOverrides( + connName, + ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, + connectorConfig, + AdminClientConfig.configDef(), + connector.getClass(), + connectorType, + ConnectorClientConfigRequest.ClientType.ADMIN, + connectorClientConfigOverridePolicy); + } } if (connectorUsesConsumer(connectorType, connectorProps)) { - consumerConfigInfos = validateClientOverrides( - connName, - ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, - connectorConfig, - ConsumerConfig.configDef(), - connector.getClass(), - connectorType, - ConnectorClientConfigRequest.ClientType.CONSUMER, - connectorClientConfigOverridePolicy); + stageDescription = "validating consumer config overrides for the connector"; + try (TemporaryStage stage = reportStage.apply(stageDescription)) { + consumerConfigInfos = validateClientOverrides( + connName, + ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, + connectorConfig, + ConsumerConfig.configDef(), + connector.getClass(), + connectorType, + ConnectorClientConfigRequest.ClientType.CONSUMER, + connectorClientConfigOverridePolicy); + } } return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 92f513a9f74..4eb77169e18 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -75,6 +75,8 @@ import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.Stage; +import org.apache.kafka.connect.util.TemporaryStage; import org.slf4j.Logger; import javax.crypto.KeyGenerator; @@ -110,6 +112,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.common.utils.Utils.UncheckedCloseable; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; @@ -213,6 +216,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private volatile long keyExpiration; private short currentProtocolVersion; private short backoffRetries; + private volatile DistributedHerderRequest currentRequest; + private volatile Stage tickThreadStage; // visible for testing // The latest pending restart request for each named connector @@ -333,6 +338,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { keyExpiration = Long.MAX_VALUE; sessionKey = null; backoffRetries = BACKOFF_RETRIES; + currentRequest = null; + tickThreadStage = new Stage("awaiting startup", time.milliseconds()); currentProtocolVersion = ConnectProtocolCompatibility.compatibility( config.getString(DistributedConfig.CONNECT_PROTOCOL_CONFIG) @@ -362,7 +369,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.info("Herder starting"); herderThread = Thread.currentThread(); - startServices(); + try (TickThreadStage stage = new TickThreadStage("reading to the end of internal topics")) { + startServices(); + } log.info("Herder started"); running = true; @@ -371,6 +380,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { tick(); } + recordTickThreadStage("shutting down"); halt(); log.info("Herder stopped"); @@ -402,7 +412,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } log.debug("Ensuring group membership is still active"); - member.ensureActive(); + String stageDescription = "ensuring membership in the cluster"; + member.ensureActive(() -> new TickThreadStage(stageDescription)); + completeTickThreadStage(); // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin if (!handleRebalanceCompleted()) return; } catch (WakeupException e) { @@ -418,7 +430,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // We were accidentally fenced out, possibly by a zombie leader try { log.debug("Reclaiming write privileges for config topic after being fenced out"); - configBackingStore.claimWritePrivileges(); + try (TickThreadStage stage = new TickThreadStage("reclaiming write privileges for the config topic")) { + configBackingStore.claimWritePrivileges(); + } fencedFromConfigTopic = false; log.debug("Successfully reclaimed write privileges for config topic after being fenced out"); } catch (Exception e) { @@ -440,7 +454,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { keyExpiration = Long.MAX_VALUE; try { SessionKey newSessionKey = new SessionKey(keyGenerator.generateKey(), now); - writeToConfigTopicAsLeader(() -> configBackingStore.putSessionKey(newSessionKey)); + writeToConfigTopicAsLeader( + "writing a new session key to the config topic", + () -> configBackingStore.putSessionKey(newSessionKey) + ); } catch (Exception e) { log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying", e); canReadConfigs = false; @@ -461,7 +478,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (next == null) { break; } else if (now >= next.at) { - requests.pollFirst(); + currentRequest = requests.pollFirst(); } else { scheduledTick = next.at; break; @@ -534,7 +551,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.trace("Polling for group activity; will wait for {}ms or until poll is interrupted by " + "either config backing store updates or a new external request", nextRequestTimeoutMs); - member.poll(nextRequestTimeoutMs); + String pollDurationDescription = scheduledTick != null ? "for up to " + nextRequestTimeoutMs + "ms or " : ""; + String stageDescription = "polling the group coordinator " + pollDurationDescription + "until interrupted"; + member.poll(nextRequestTimeoutMs, () -> new TickThreadStage(stageDescription)); + completeTickThreadStage(); // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin handleRebalanceCompleted(); } catch (WakeupException e) { // FIXME should not be WakeupException @@ -694,13 +714,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { boolean remains = configState.contains(connectorName); log.info("Handling connector-only config update by {} connector {}", remains ? "restarting" : "stopping", connectorName); - worker.stopAndAwaitConnector(connectorName); + try (TickThreadStage stage = new TickThreadStage("stopping connector " + connectorName)) { + worker.stopAndAwaitConnector(connectorName); + } // The update may be a deletion, so verify we actually need to restart the connector if (remains) { connectorsToStart.add(getConnectorStartingCallable(connectorName)); } } - startAndStop(connectorsToStart); + String stageDescription = "restarting " + connectorsToStart.size() + " reconfigured connectors"; + startAndStop(connectorsToStart, stageDescription); } private void processTargetStateChanges(Set connectorTargetStateChanges) { @@ -758,7 +781,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } log.info("Handling task config update by stopping tasks {}, which will be restarted after rebalance if still assigned to this worker", tasksToStop); - worker.stopAndAwaitTasks(tasksToStop); + try (TickThreadStage stage = new TickThreadStage("stopping " + tasksToStop.size() + " reconfigured tasks")) { + worker.stopAndAwaitTasks(tasksToStop); + } tasksToRestart.addAll(tasksToStop); } @@ -824,7 +849,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { callback.onCompletion(null, configState.connectors()); return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -845,7 +870,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -865,7 +890,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -894,12 +919,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable { callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); } else { log.trace("Removing connector config {} {}", connName, configState.connectors()); - writeToConfigTopicAsLeader(() -> configBackingStore.removeConnectorConfig(connName)); + writeToConfigTopicAsLeader( + "removing the config for connector " + connName + " from the config topic", + () -> configBackingStore.removeConnectorConfig(connName) + ); callback.onCompletion(null, new Created<>(false, null)); } return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -1058,7 +1086,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.trace("Submitting connector config write request {}", connName); addRequest( () -> { - validateConnectorConfig(config, (error, configInfos) -> { + validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> { if (error != null) { callback.onCompletion(error, null); return; @@ -1085,7 +1113,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); - writeToConfigTopicAsLeader(() -> configBackingStore.putConnectorConfig(connName, config, targetState)); + writeToConfigTopicAsLeader( + "writing a config for connector " + connName + " to the config topic", + () -> configBackingStore.putConnectorConfig(connName, config, targetState) + ); // Note that we use the updated connector config despite the fact that we don't have an updated // snapshot yet. The existing task info should still be accurate. @@ -1094,12 +1125,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable { callback.onCompletion(null, new Created<>(!exists, info)); return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); - }); + })); return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -1124,7 +1155,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // a non-empty set of task configs). A STOPPED connector with a non-empty set of tasks is less acceptable // and likely to confuse users. writeTaskConfigs(connName, Collections.emptyList()); - configBackingStore.putTargetState(connName, TargetState.STOPPED); + String stageDescription = "writing the STOPPED target stage for connector " + connName + " to the config topic"; + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + configBackingStore.putTargetState(connName, TargetState.STOPPED); + } // Force a read of the new target state for the connector if (!refreshConfigSnapshot(workerSyncTimeoutMs)) { log.warn("Failed to read to end of config topic after writing the STOPPED target state for connector {}", connName); @@ -1133,7 +1167,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { callback.onCompletion(null, null); return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -1176,7 +1210,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -1199,7 +1233,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -1232,7 +1266,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), fenceUrl); forwardRequestExecutor.execute(() -> { try { - restClient.httpRequest(fenceUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm); + String stageDescription = "Forwarding zombie fencing request to the leader at " + workerUrl; + try (TemporaryStage stage = new TemporaryStage(stageDescription, callback, time)) { + restClient.httpRequest(fenceUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm); + } callback.onCompletion(null, null); } catch (Throwable t) { callback.onCompletion(t, null); @@ -1261,7 +1298,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { addRequest(() -> { doFenceZombieSourceTasks(connName, callback); return null; - }, forwardErrorCallback(callback)); + }, forwardErrorAndTickThreadStages(callback)); } private void doFenceZombieSourceTasks(String connName, Callback callback) { @@ -1325,7 +1362,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.debug("Skipping zombie fencing round but writing task count record for connector {} " + "as both the most recent and the current generation of task configs only contain one task", connName); } - writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, taskCount)); + writeToConfigTopicAsLeader( + "writing a task count record for connector " + connName + " to the config topic", + () -> configBackingStore.putTaskCountRecord(connName, taskCount) + ); } callback.onCompletion(null, null); } @@ -1351,7 +1391,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (assignment.connectors().contains(connName)) { try { - worker.stopAndAwaitConnector(connName); + try (TickThreadStage stage = new TickThreadStage("stopping restarted connector " + connName)) { + worker.stopAndAwaitConnector(connName); + } startConnector(connName, callback); } catch (Throwable t) { callback.onCompletion(t, null); @@ -1363,7 +1405,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } return null; }, - forwardErrorCallback(callback)); + forwardErrorAndTickThreadStages(callback)); } @Override @@ -1384,7 +1426,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } if (assignment.tasks().contains(id)) { - try { + try (TickThreadStage stage = new TickThreadStage("restarting task " + id)) { worker.stopAndAwaitTask(id); if (startTask(id)) callback.onCompletion(null, null); @@ -1400,7 +1442,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } return null; }, - forwardErrorCallback(callback)); + forwardErrorAndTickThreadStages(callback)); } @Override @@ -1422,7 +1464,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } if (isLeader()) { // Write a restart request to the config backing store, to be executed asynchronously in tick() - configBackingStore.putRestartRequest(request); + String stageDescription = "writing restart request for connector " + request.connectorName() + " to the config topic"; + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + configBackingStore.putRestartRequest(request); + } // Compute and send the response that this was accepted Optional plan = buildRestartPlan(request); if (!plan.isPresent()) { @@ -1435,7 +1480,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } @@ -1455,7 +1500,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { pendingRestartRequests.clear(); } restartRequests.forEach(restartRequest -> { - try { + String stageDescription = "handling restart request for connector " + restartRequest.connectorName(); + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { doRestartConnectorAndTasks(restartRequest); } catch (Exception e) { log.warn("Unexpected error while trying to process " + restartRequest + ", the restart request will be skipped.", e); @@ -1488,12 +1534,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable { final boolean restartConnector = plan.shouldRestartConnector() && currentAssignments.connectors().contains(connectorName); final boolean restartTasks = !assignedIdsToRestart.isEmpty(); if (restartConnector) { - worker.stopAndAwaitConnector(connectorName); + String stageDescription = "stopping to-be-restarted connector " + connectorName; + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + worker.stopAndAwaitConnector(connectorName); + } onRestart(connectorName); } if (restartTasks) { + String stageDescription = "stopping " + assignedIdsToRestart.size() + " to-be-restarted tasks for connector " + connectorName; // Stop the tasks and mark as restarting - worker.stopAndAwaitTasks(assignedIdsToRestart); + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + worker.stopAndAwaitTasks(assignedIdsToRestart); + } assignedIdsToRestart.forEach(this::onRestart); } @@ -1538,7 +1590,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { super.connectorOffsets(connName, cb); return null; }, - forwardErrorCallback(cb) + forwardErrorAndTickThreadStages(cb) ); } @@ -1563,18 +1615,22 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // We need to ensure that we perform the necessary checks again before proceeding to actually altering / resetting the connector offsets since // zombie fencing is done asynchronously and the conditions could have changed since the previous check addRequest(() -> { - if (modifyConnectorOffsetsChecks(connName, callback)) { - worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + try (TickThreadStage stage = new TickThreadStage("modifying offsets for connector " + connName)) { + if (modifyConnectorOffsetsChecks(connName, callback)) { + worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + } } return null; - }, forwardErrorCallback(callback)); + }, forwardErrorAndTickThreadStages(callback)); } }); } else { - worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + try (TickThreadStage stage = new TickThreadStage("modifying offsets for connector " + connName)) { + worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + } } return null; - }, forwardErrorCallback(callback)); + }, forwardErrorAndTickThreadStages(callback)); } /** @@ -1641,11 +1697,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * Note that it is not necessary to wrap every write to the config topic in this method, only the writes that should be performed * exclusively by the leader. For example, {@link ConfigBackingStore#putTargetState(String, TargetState)} does not require this * method, as it can be invoked by any worker in the cluster. + * @param description a description of the action; e.g., "writing 5 task configs for connector file-source" * @param write the action that writes to the config topic, such as {@link ConfigBackingStore#putSessionKey(SessionKey)} or * {@link ConfigBackingStore#putConnectorConfig(String, Map, TargetState)}. */ - private void writeToConfigTopicAsLeader(Runnable write) { - try { + private void writeToConfigTopicAsLeader(String description, Runnable write) { + try (TickThreadStage stage = new TickThreadStage(description)) { write.run(); } catch (PrivilegedWriteException e) { log.warn("Failed to write to config topic as leader; will rejoin group if necessary and, if still leader, attempt to reclaim write privileges for the config topic", e); @@ -1782,7 +1839,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @return true if successful; false if timed out */ private boolean refreshConfigSnapshot(long timeoutMs) { - try { + try (TickThreadStage stage = new TickThreadStage("reading to the end of the config topic")) { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset()); @@ -1796,14 +1853,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private void backoff(long ms) { if (ConnectProtocolCompatibility.fromProtocolVersion(currentProtocolVersion) == EAGER) { - time.sleep(ms); + String stageDescription = "sleeping for a backoff duration of " + ms + "ms"; + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + time.sleep(ms); + } return; } if (backoffRetries > 0) { int rebalanceDelayFraction = config.getInt(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG) / 10 / backoffRetries; - time.sleep(rebalanceDelayFraction); + String stageDescription = "sleeping for a backoff duration of " + rebalanceDelayFraction + "ms"; + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + time.sleep(rebalanceDelayFraction); + } --backoffRetries; return; } @@ -1820,8 +1883,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } // Visible for testing - void startAndStop(Collection> callables) { - try { + void startAndStop(Collection> callables, String stageDescription) { + if (callables.isEmpty()) + return; + + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { startAndStopExecutor.invokeAll(callables); } catch (InterruptedException e) { // ignore @@ -1864,7 +1930,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } - startAndStop(callables); + String stageDescription = "starting " + callables.size() + " connector(s) and task(s) after a rebalance"; + startAndStop(callables, stageDescription); synchronized (this) { runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0 @@ -1982,12 +2049,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable { callback.onCompletion(null, null); return null; }, - forwardErrorCallback(callback) + forwardErrorAndTickThreadStages(callback) ); } else { callback.onCompletion(null, null); } }; + callback.recordStage(new Stage("starting the connector", time.milliseconds())); worker.startConnector(connectorName, configProps, ctx, this, initialState, onInitialStateChange); } @@ -2101,8 +2169,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } else { connConfig = new SourceConnectorConfig(plugins(), configs, worker.isTopicCreationEnabled()); } - - final List> taskProps = worker.connectorTaskConfigs(connName, connConfig); + final List> taskProps; + try (TickThreadStage stage = new TickThreadStage("generating task configs for connector " + connName)) { + taskProps = worker.connectorTaskConfigs(connName, connConfig); + } publishConnectorTaskConfigs(connName, taskProps, cb); } catch (Throwable t) { cb.onCompletion(t, null); @@ -2144,7 +2214,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { .build() .toString(); log.trace("Forwarding task configurations for connector {} to leader", connName); - restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm); + String stageDescription = "Forwarding task configurations to the leader at " + leaderUrl; + try (TemporaryStage stage = new TemporaryStage(stageDescription, cb, time)) { + restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm); + } cb.onCompletion(null, null); } catch (ConnectException e) { log.error("Request to leader to reconfigure connector tasks failed", e); @@ -2160,7 +2233,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { throw new BadRequestException("Cannot submit non-empty set of task configs for stopped connector " + connName); } - writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, taskConfigs)); + writeToConfigTopicAsLeader( + "writing " + taskConfigs.size() + " task configs for connector " + connName + " to the config topic", + () -> configBackingStore.putTaskConfigs(connName, taskConfigs) + ); } // Invoked by exactly-once worker source tasks after they have successfully initialized their transactional @@ -2175,7 +2251,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { addRequest( () -> verifyTaskGenerationAndOwnership(id, initialTaskGen, verifyCallback), - forwardErrorCallback(verifyCallback) + forwardErrorAndTickThreadStages(verifyCallback) ); try { @@ -2248,6 +2324,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // queue was added if (peekWithoutException() == req) member.wakeup(); + callback.recordStage(tickThreadStage); return req; } @@ -2257,6 +2334,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { callback.onCompletion(null, null); } catch (Throwable t) { callback.onCompletion(t, null); + } finally { + currentRequest = null; } } @@ -2439,11 +2518,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } - private static Callback forwardErrorCallback(final Callback callback) { - return (error, result) -> { + private static Callback forwardErrorAndTickThreadStages(final Callback callback) { + Callback cb = callback.chainStaging((error, result) -> { if (error != null) callback.onCompletion(error, null); - }; + }); + return cb; } private void updateDeletedConnectorStatus() { @@ -2574,7 +2654,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // The actual timeout for graceful task/connector stop is applied in worker's // stopAndAwaitTask/stopAndAwaitConnector methods. - startAndStop(callables); + String stageDescription = "stopping " + connectors.size() + " and " + tasks.size() + " tasks"; + startAndStop(callables, stageDescription); log.info("Finished stopping tasks in preparation for rebalance"); synchronized (DistributedHerder.this) { @@ -2593,7 +2674,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // Ensure that all status updates have been pushed to the storage system before rebalancing. // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance // completes. - statusBackingStore.flush(); + try (TickThreadStage stage = new TickThreadStage("flushing updates to the status topic")) { + statusBackingStore.flush(); + } log.info("Finished flushing status backing store in preparation for rebalance"); } else { log.info("Wasn't able to resume work after last rebalance, can skip stopping connectors and tasks"); @@ -2601,14 +2684,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } private void resetActiveTopics(Collection connectors, Collection tasks) { - connectors.stream() - .filter(connectorName -> !configState.contains(connectorName)) - .forEach(DistributedHerder.this::resetConnectorActiveTopics); - tasks.stream() - .map(ConnectorTaskId::connector) - .distinct() - .filter(connectorName -> !configState.contains(connectorName)) - .forEach(DistributedHerder.this::resetConnectorActiveTopics); + String stageDescription = "resetting the list of active topics for " + connectors.size() + " and " + tasks.size() + " tasks"; + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + connectors.stream() + .filter(connectorName -> !configState.contains(connectorName)) + .forEach(DistributedHerder.this::resetConnectorActiveTopics); + tasks.stream() + .map(ConnectorTaskId::connector) + .distinct() + .filter(connectorName -> !configState.contains(connectorName)) + .forEach(DistributedHerder.this::resetConnectorActiveTopics); + } } } @@ -2666,6 +2752,43 @@ public class DistributedHerder extends AbstractHerder implements Runnable { return result; } + private synchronized void recordTickThreadStage(String description) { + assert description != null; + + log.trace("Recording new tick thread stage: {}", description); + + // Preserve the current stage to report to requests submitted after this method is invoked + this.tickThreadStage = new Stage(description, time.milliseconds()); + // Report the current stage to all pending requests + requests.forEach(request -> request.callback().recordStage(tickThreadStage)); + // And, if there is an in-progress request, report the stage to it as well + Optional.ofNullable(currentRequest).map(DistributedHerderRequest::callback) + .ifPresent(callback -> callback.recordStage(tickThreadStage)); + } + + private synchronized void completeTickThreadStage() { + // This is expected behavior with nested stages; can just no-op + if (tickThreadStage == null) + return; + + log.trace("Completing current tick thread stage; was {}", tickThreadStage); + + tickThreadStage.complete(time.milliseconds()); + tickThreadStage = null; + } + + private class TickThreadStage implements UncheckedCloseable { + public TickThreadStage(String description) { + if (description != null) + recordTickThreadStage(description); + } + + @Override + public void close() { + completeTickThreadStage(); + } + } + /** * Represents an active zombie fencing: that is, an in-progress attempt to invoke * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count @@ -2699,18 +2822,21 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (fencingFuture != null) { throw new IllegalStateException("Cannot invoke start() multiple times"); } - fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> { - // This callback will be called on the same thread that invokes KafkaFuture::thenApply if - // the future is already completed. Since that thread is the herder tick thread, we don't need - // to perform follow-up logic through an additional herder request (and if we tried, it would lead - // to deadlock) - runOnTickThread( - this::onZombieFencingSuccess, - fencingFollowup - ); - awaitFollowup(); - return null; - }); + String stageDescription = "initiating a round of zombie fencing for connector " + connName; + try (TickThreadStage stage = new TickThreadStage(stageDescription)) { + fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> { + // This callback will be called on the same thread that invokes KafkaFuture::thenApply if + // the future is already completed. Since that thread is the herder tick thread, we don't need + // to perform follow-up logic through an additional herder request (and if we tried, it would lead + // to deadlock) + runOnTickThread( + this::onZombieFencingSuccess, + fencingFollowup + ); + awaitFollowup(); + return null; + }); + } // Immediately after the fencing and necessary followup work (i.e., writing the task count record to the config topic) // is complete, remove this from the list of active fencings addCallback((ignored, error) -> { @@ -2736,7 +2862,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (fencingFollowup.isDone()) { return null; } - writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, tasksToRecord)); + writeToConfigTopicAsLeader( + "writing a task count record for connector " + connName + " to the config topic", + () -> configBackingStore.putTaskCountRecord(connName, tasksToRecord) + ); return null; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index fab9e0a65e7..a57dc69f4f5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -42,11 +42,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import static org.apache.kafka.common.utils.Utils.UncheckedCloseable; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER; + /** * This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments * to workers. @@ -120,7 +123,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable return super.ensureCoordinatorReady(timer); } - public void poll(long timeout) { + public void poll(long timeout, Supplier onPoll) { // poll for io until the timeout expires final long start = time.milliseconds(); long now = start; @@ -130,7 +133,11 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable if (coordinatorUnknown()) { log.debug("Broker coordinator is marked unknown. Attempting discovery with a timeout of {}ms", coordinatorDiscoveryTimeoutMs); - if (ensureCoordinatorReady(time.timer(coordinatorDiscoveryTimeoutMs))) { + boolean coordinatorReady; + try (UncheckedCloseable polling = onPoll.get()) { + coordinatorReady = ensureCoordinatorReady(time.timer(coordinatorDiscoveryTimeoutMs)); + } + if (coordinatorReady) { log.debug("Broker coordinator is ready"); } else { log.debug("Can not connect to broker coordinator"); @@ -146,7 +153,9 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable } if (rejoinNeededOrPending()) { - ensureActiveGroup(); + try (UncheckedCloseable polling = onPoll.get()) { + ensureActiveGroup(); + } now = time.milliseconds(); } @@ -158,7 +167,9 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable // Note that because the network client is shared with the background heartbeat thread, // we do not want to block in poll longer than the time to the next heartbeat. long pollTimeout = Math.min(Math.max(0, remaining), timeToNextHeartbeat(now)); - client.poll(time.timer(pollTimeout)); + try (UncheckedCloseable polling = onPoll.get()) { + client.poll(time.timer(pollTimeout)); + } now = time.milliseconds(); elapsed = now - start; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 7334f50fc36..2c3537ea675 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -48,6 +48,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static org.apache.kafka.common.utils.Utils.UncheckedCloseable; /** * This class manages the coordination process with brokers for the Connect cluster group membership. It ties together @@ -158,14 +161,14 @@ public class WorkerGroupMember { * Ensure that the connection to the broker coordinator is up and that the worker is an * active member of the group. */ - public void ensureActive() { - coordinator.poll(0); + public void ensureActive(Supplier onPoll) { + coordinator.poll(0, onPoll); } - public void poll(long timeout) { + public void poll(long timeout, Supplier onPoll) { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); - coordinator.poll(timeout); + coordinator.poll(timeout, onPoll); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java index 4dfd093bc2e..18c969098b7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java @@ -21,12 +21,15 @@ import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.distributed.RequestTargetException; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.FutureCallback; +import org.apache.kafka.connect.util.Stage; +import org.apache.kafka.connect.util.StagedTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; +import java.time.Instant; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -38,7 +41,7 @@ public class HerderRequestHandler { private final RestClient restClient; - private long requestTimeoutMs; + private volatile long requestTimeoutMs; public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) { this.restClient = restClient; @@ -64,6 +67,22 @@ public class HerderRequestHandler { return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw e.getCause(); + } catch (StagedTimeoutException e) { + String message; + Stage stage = e.stage(); + if (stage.completed() != null) { + message = "Request timed out. The last operation the worker completed was " + + stage.description() + ", which began at " + + Instant.ofEpochMilli(stage.started()) + " and completed at " + + Instant.ofEpochMilli(stage.completed()); + } else { + message = "Request timed out. The worker is currently " + + stage.description() + ", which began at " + + Instant.ofEpochMilli(stage.started()); + } + // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server + // error is the best option + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), message); } catch (TimeoutException e) { // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server // error is the best option diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 037d98b68e6..3c6cce98a05 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -27,6 +27,8 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.PluginInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.FutureCallback; +import org.apache.kafka.connect.util.Stage; +import org.apache.kafka.connect.util.StagedTimeoutException; import javax.ws.rs.BadRequestException; import javax.ws.rs.Consumes; @@ -39,6 +41,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -107,6 +110,22 @@ public class ConnectorPluginsResource implements ConnectResource { try { return validationCallback.get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (StagedTimeoutException e) { + Stage stage = e.stage(); + String message; + if (stage.completed() != null) { + message = "Request timed out. The last operation the worker completed was " + + stage.description() + ", which began at " + + Instant.ofEpochMilli(stage.started()) + " and completed at " + + Instant.ofEpochMilli(stage.completed()); + } else { + message = "Request timed out. The worker is currently " + + stage.description() + ", which began at " + + Instant.ofEpochMilli(stage.started()); + } + // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server + // error is the best option + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), message); } catch (TimeoutException e) { // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server // error is the best option diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java index 277863b7f74..c09eba62a23 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java @@ -27,4 +27,22 @@ public interface Callback { * @param result the return value, or null if the operation failed */ void onCompletion(Throwable error, V result); + + default void recordStage(Stage stage) { + } + + default Callback chainStaging(Callback chained) { + return new Callback() { + @Override + public void recordStage(Stage stage) { + Callback.this.recordStage(stage); + } + + @Override + public void onCompletion(Throwable error, V2 result) { + chained.onCompletion(error, result); + } + }; + } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java index 0fbfe5c1fa2..7b541c12ba3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java @@ -40,6 +40,7 @@ public abstract class ConvertingFutureCallback implements Callback, Fut private volatile T result = null; private volatile Throwable exception = null; private volatile boolean cancelled = false; + private volatile Stage currentStage = null; public ConvertingFutureCallback() { this(null); @@ -110,11 +111,22 @@ public abstract class ConvertingFutureCallback implements Callback, Fut @Override public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - if (!finishedLatch.await(l, timeUnit)) - throw new TimeoutException("Timed out waiting for future"); + if (!finishedLatch.await(l, timeUnit)) { + Stage stage = currentStage; + if (stage != null) { + throw new StagedTimeoutException(stage); + } else { + throw new TimeoutException(); + } + } return result(); } + @Override + public void recordStage(Stage stage) { + this.currentStage = stage; + } + private T result() throws ExecutionException { if (cancelled) { throw new CancellationException(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java new file mode 100644 index 00000000000..e51e406f332 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Stage { + + private static final Logger log = LoggerFactory.getLogger(Stage.class); + + private final String description; + private final long started; + private volatile Long completed; + + public Stage(String description, long started) { + this.description = description; + this.started = started; + this.completed = null; + } + + public String description() { + return description; + } + + public long started() { + return started; + } + + public Long completed() { + return completed; + } + + public synchronized void complete(long time) { + if (time < started) { + log.warn("Ignoring invalid completion time {} since it is before this stage's start time of {}", time, started); + return; + } + + if (completed != null) { + log.warn("Ignoring completion time of {} since this stage was already completed at {}", time, completed); + return; + } + + this.completed = time; + } + + @Override + public String toString() { + return description + "(started " + started + ", completed=" + completed() + ")"; + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/StagedTimeoutException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/StagedTimeoutException.java new file mode 100644 index 00000000000..aff39f9fcce --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/StagedTimeoutException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.Objects; +import java.util.concurrent.TimeoutException; + +public class StagedTimeoutException extends TimeoutException { + + private final Stage stage; + + public StagedTimeoutException(Stage stage) { + super(); + this.stage = Objects.requireNonNull(stage, "Stage may not be null"); + } + + public Stage stage() { + return stage; + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TemporaryStage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TemporaryStage.java new file mode 100644 index 00000000000..4977719b97a --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TemporaryStage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.utils.Time; + +public class TemporaryStage implements AutoCloseable { + + private final Stage stage; + private final Time time; + + public TemporaryStage(String description, Callback callback, Time time) { + this.stage = new Stage(description, time.milliseconds()); + this.time = time; + callback.recordStage(stage); + } + + @Override + public void close() { + stage.complete(time.milliseconds()); + } + +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 33614e317c4..03ace3b5706 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -133,6 +133,11 @@ public class BlockingConnectorTest { NUM_WORKERS, "Initial group of workers did not start in time" ); + + try (Response response = connect.requestGet(connect.endpointForResource("connectors/nonexistent"))) { + // hack: make sure the worker is actually up (has joined the cluster, created and read to the end of internal topics, etc.) + assertEquals(404, response.getStatus()); + } } @After @@ -145,7 +150,11 @@ public class BlockingConnectorTest { @Test public void testBlockInConnectorValidate() throws Exception { log.info("Starting test testBlockInConnectorValidate"); - assertRequestTimesOut("create connector that blocks during validation", () -> createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE)); + assertRequestTimesOut( + "create connector that blocks during validation", + () -> createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE), + "The worker is currently performing multi-property validation for the connector" + ); // Will NOT assert that connector has failed, since the request should fail before it's even created // Connector should already be blocked so this should return immediately, but check just to @@ -159,7 +168,11 @@ public class BlockingConnectorTest { @Test public void testBlockInConnectorConfig() throws Exception { log.info("Starting test testBlockInConnectorConfig"); - assertRequestTimesOut("create connector that blocks while getting config", () -> createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG)); + assertRequestTimesOut( + "create connector that blocks while getting config", + () -> createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG), + "The worker is currently retrieving the configuration definition from the connector" + ); // Will NOT assert that connector has failed, since the request should fail before it's even created // Connector should already be blocked so this should return immediately, but check just to @@ -178,6 +191,13 @@ public class BlockingConnectorTest { createNormalConnector(); verifyNormalConnector(); + + // Try to restart the connector + assertRequestTimesOut( + "restart connector that blocks during initialize", + () -> connect.restartConnector(BLOCKING_CONNECTOR_NAME), + "The worker is currently starting the connector" + ); } @Test @@ -188,6 +208,13 @@ public class BlockingConnectorTest { createNormalConnector(); verifyNormalConnector(); + + // Try to restart the connector + assertRequestTimesOut( + "restart connector that blocks during start", + () -> connect.restartConnector(BLOCKING_CONNECTOR_NAME), + "The worker is currently starting the connector" + ); } @Test @@ -329,7 +356,7 @@ public class BlockingConnectorTest { normalConnectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS); } - private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request) { + private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request, String expectedTimeoutMessage) { // Artificially reduce the REST request timeout so that these don't take 90 seconds connect.requestTimeout(REDUCED_REST_REQUEST_TIMEOUT); ConnectRestException exception = assertThrows( @@ -345,6 +372,12 @@ public class BlockingConnectorTest { + "; instead, message was: " + exception.getMessage(), exception.getMessage().contains("Request timed out") ); + if (expectedTimeoutMessage != null) { + assertTrue( + "Timeout error message '" + exception.getMessage() + "' does not match expected format", + exception.getMessage().contains(expectedTimeoutMessage) + ); + } // Reset the REST request timeout so that other requests aren't impacted connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 8a271603eef..6e49120343f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; @@ -44,7 +45,9 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; @@ -58,11 +61,16 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; +import static org.apache.kafka.connect.runtime.rest.resources.ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** @@ -771,6 +779,111 @@ public class ConnectWorkerIntegrationTest { return props; } + @Test + public void testRequestTimeouts() throws Exception { + final String configTopic = "test-request-timeout-configs"; + workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); + // Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to + // be spuriously triggered after the group coordinator for a Connect cluster is bounced + workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0"); + connect = connectBuilder + .numBrokers(1) + .numWorkers(1) + .build(); + connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(1, + "Worker did not start in time"); + + Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); + Map connectorConfig2 = new HashMap<>(connectorConfig1); + connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + + // Create a connector to ensure that the worker has completed startup + log.info("Creating initial connector"); + connect.configureConnector(CONNECTOR_NAME, connectorConfig1); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" + ); + + // Bring down Kafka, which should cause some REST requests to fail + log.info("Stopping Kafka cluster"); + connect.kafka().stopOnlyKafka(); + + // Try to reconfigure the connector, which should fail with a timeout error + log.info("Trying to reconfigure connector while Kafka cluster is down"); + assertTimeoutException( + () -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2), + "flushing updates to the status topic" + ); + log.info("Restarting Kafka cluster"); + connect.kafka().startOnlyKafkaOnSamePorts(); + connect.assertions().assertExactlyNumBrokersAreUp(1, "Broker did not complete startup in time"); + log.info("Kafka cluster is restarted"); + + // Reconfigure the connector to ensure that the broker has completed startup + log.info("Reconfiguring connector with one more task"); + connect.configureConnector(CONNECTOR_NAME, connectorConfig2); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, NUM_TASKS + 1, "connector and tasks did not start in time" + ); + + // Delete the config topic--WCGW? + log.info("Deleting Kafka Connect config topic"); + connect.kafka().deleteTopic(configTopic); + + // Try to delete the connector, which should fail with a slightly-different timeout error + log.info("Trying to reconfigure connector after config topic has been deleted"); + assertTimeoutException( + () -> connect.deleteConnector(CONNECTOR_NAME), + "removing the config for connector " + CONNECTOR_NAME + " from the config topic" + ); + + // The worker should still be blocked on the same operation + log.info("Trying again to reconfigure connector after config topic has been deleted"); + assertTimeoutException( + () -> connect.configureConnector(CONNECTOR_NAME, connectorConfig1), + "removing the config for connector " + CONNECTOR_NAME + " from the config topic" + ); + } + + private void assertTimeoutException(Runnable operation, String expectedStageDescription) throws InterruptedException { + connect.requestTimeout(1_000); + AtomicReference latestError = new AtomicReference<>(); + waitForCondition( + () -> { + try { + operation.run(); + latestError.set(null); + return false; + } catch (Throwable t) { + latestError.set(t); + assertTrue(t instanceof ConnectRestException); + ConnectRestException restException = (ConnectRestException) t; + + assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), restException.statusCode()); + assertNotNull(restException.getMessage()); + assertTrue( + "Message '" + restException.getMessage() + "' does not match expected format", + restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription) + ); + + return true; + } + }, + 30_000, + () -> { + String baseMessage = "REST request did not time out with expected error message in time. "; + Throwable t = latestError.get(); + if (t == null) { + return baseMessage + "The most recent request did not fail."; + } else { + return baseMessage + "Most recent error: " + t; + } + } + ); + connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); + } + private Map defaultSourceConnectorProps(String topic) { // setup props for the source connector Map props = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index eb86920adff..7d7020604cc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -360,7 +360,7 @@ public class AbstractHerderTest { config.put("required", "value"); config.put("testKey", null); - final ConfigInfos configInfos = herder.validateConnectorConfig(config, false); + final ConfigInfos configInfos = herder.validateConnectorConfig(config, s -> null, false); assertEquals(1, configInfos.errorCount()); assertErrorForKey(configInfos, "testKey"); @@ -378,7 +378,7 @@ public class AbstractHerderTest { config.put("testKey", null); config.put("secondTestKey", null); - final ConfigInfos configInfos = herder.validateConnectorConfig(config, false); + final ConfigInfos configInfos = herder.validateConnectorConfig(config, s -> null, false); assertEquals(2, configInfos.errorCount()); assertErrorForKey(configInfos, "testKey"); @@ -448,7 +448,7 @@ public class AbstractHerderTest { public void testConfigValidationEmptyConfig() { AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy, 0); - assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), false)); + assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), s -> null, false)); verify(transformer).transform(Collections.emptyMap()); } @@ -458,7 +458,7 @@ public class AbstractHerderTest { AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy); Map config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName()); - ConfigInfos result = herder.validateConnectorConfig(config, false); + ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on // the config fields for SourceConnectorConfig, but we expect these to change rarely. @@ -493,7 +493,7 @@ public class AbstractHerderTest { config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2"); config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*"); - ConfigInfos validation = herder.validateConnectorConfig(config, false); + ConfigInfos validation = herder.validateConnectorConfig(config, s -> null, false); ConfigInfo topicsListInfo = findInfo(validation, SinkConnectorConfig.TOPICS_CONFIG); assertNotNull(topicsListInfo); @@ -516,7 +516,7 @@ public class AbstractHerderTest { config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1"); config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1"); - ConfigInfos validation = herder.validateConnectorConfig(config, false); + ConfigInfos validation = herder.validateConnectorConfig(config, s -> null, false); ConfigInfo topicsListInfo = findInfo(validation, SinkConnectorConfig.TOPICS_CONFIG); assertNotNull(topicsListInfo); @@ -535,7 +535,7 @@ public class AbstractHerderTest { config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*"); config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1"); - ConfigInfos validation = herder.validateConnectorConfig(config, false); + ConfigInfos validation = herder.validateConnectorConfig(config, s -> null, false); ConfigInfo topicsRegexInfo = findInfo(validation, SinkConnectorConfig.TOPICS_REGEX_CONFIG); assertNotNull(topicsRegexInfo); @@ -560,7 +560,7 @@ public class AbstractHerderTest { config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB"); config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName()); config.put("required", "value"); // connector required config - ConfigInfos result = herder.validateConnectorConfig(config, false); + ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); assertEquals(herder.connectorType(config), ConnectorType.SOURCE); // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on @@ -616,7 +616,7 @@ public class AbstractHerderTest { config.put(ConnectorConfig.PREDICATES_CONFIG + ".predX.type", SamplePredicate.class.getName()); config.put("required", "value"); // connector required config - ConfigInfos result = herder.validateConnectorConfig(config, false); + ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); assertEquals(ConnectorType.SOURCE, herder.connectorType(config)); // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on @@ -682,7 +682,7 @@ public class AbstractHerderTest { config.put(ackConfigKey, "none"); config.put(saslConfigKey, "jaas_config"); - ConfigInfos result = herder.validateConnectorConfig(config, false); + ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); assertEquals(herder.connectorType(config), ConnectorType.SOURCE); // We expect there to be errors due to now allowed override policy for ACKS.... Note that these assertions depend heavily on @@ -741,7 +741,7 @@ public class AbstractHerderTest { overriddenClientConfigs.add(bootstrapServersConfigKey); overriddenClientConfigs.add(loginCallbackHandlerConfigKey); - ConfigInfos result = herder.validateConnectorConfig(config, false); + ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); assertEquals(herder.connectorType(config), ConnectorType.SOURCE); Map validatedOverriddenClientConfigs = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 5a22e6c10f0..31e6d6dee2b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -69,6 +69,7 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; +import org.apache.kafka.connect.util.Stage; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -91,6 +92,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -101,12 +103,14 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.Collections.singletonList; import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; +import static org.apache.kafka.common.utils.Utils.UncheckedCloseable; import static org.apache.kafka.connect.runtime.AbstractStatus.State.FAILED; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED; @@ -337,7 +341,7 @@ public class DistributedHerderTest { expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); when(worker.startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); time.sleep(1000L); @@ -361,7 +365,7 @@ public class DistributedHerderTest { expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); when(worker.startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); time.sleep(1000L); assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY); @@ -399,7 +403,7 @@ public class DistributedHerderTest { expectRebalance(1, Collections.emptyList(), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); time.sleep(1000L); assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY); @@ -444,7 +448,7 @@ public class DistributedHerderTest { ConnectProtocol.Assignment.NO_ERROR, 1, Collections.emptyList(), Collections.emptyList(), 0); doNothing().when(member).requestRejoin(); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.configState = SNAPSHOT; time.sleep(1000L); @@ -485,7 +489,7 @@ public class DistributedHerderTest { doAnswer(invocation -> { time.sleep(9900L); return null; - }).when(member).poll(anyLong()); + }).when(member).poll(anyLong(), any()); // Request to re-join expected because the scheduled rebalance delay has been reached doNothing().when(member).requestRejoin(); @@ -509,7 +513,7 @@ public class DistributedHerderTest { expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); when(worker.startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); time.sleep(2000L); @@ -534,7 +538,7 @@ public class DistributedHerderTest { expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); when(worker.startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); time.sleep(1000L); @@ -591,7 +595,7 @@ public class DistributedHerderTest { when(worker.startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); @@ -649,7 +653,7 @@ public class DistributedHerderTest { // The tick loop where the revoke happens returns early (because there's a subsequent rebalance) and doesn't result in a poll at // the end of the method - verify(member, times(2)).poll(anyLong()); + verify(member, times(2)).poll(anyLong(), any()); verifyNoMoreInteractions(member, statusBackingStore, configBackingStore, worker); } @@ -676,7 +680,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // Initial rebalance where this member becomes the leader herder.tick(); @@ -693,10 +697,12 @@ public class DistributedHerderTest { doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2), eq(CONN2_CONFIG), isNull()); // This will occur just before/during the second tick - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); // No immediate action besides this -- change will be picked up via the config log + List stages = expectRecordStages(putConnectorCallback); + herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback); // This tick runs the initial herder request, which issues an asynchronous request for // connector validation @@ -711,6 +717,14 @@ public class DistributedHerderTest { ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList(), ConnectorType.SOURCE); verify(putConnectorCallback).onCompletion(isNull(), eq(new Herder.Created<>(true, info))); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback); + + assertEquals( + Arrays.asList( + "ensuring membership in the cluster", + "writing a config for connector " + CONN2 + " to the config topic" + ), + stages + ); } @Test @@ -721,7 +735,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // Initial rebalance where this member becomes the leader herder.tick(); @@ -738,9 +752,10 @@ public class DistributedHerderTest { doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2), eq(CONN2_CONFIG), eq(TargetState.STOPPED)); // This will occur just before/during the second tick - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); // No immediate action besides this -- change will be picked up via the config log + List stages = expectRecordStages(putConnectorCallback); herder.putConnectorConfig(CONN2, CONN2_CONFIG, TargetState.STOPPED, false, putConnectorCallback); // This tick runs the initial herder request, which issues an asynchronous request for @@ -756,6 +771,14 @@ public class DistributedHerderTest { ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList(), ConnectorType.SOURCE); verify(putConnectorCallback).onCompletion(isNull(), eq(new Herder.Created<>(true, info))); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback); + + assertEquals( + Arrays.asList( + "ensuring membership in the cluster", + "writing a config for connector " + CONN2 + " to the config topic" + ), + stages + ); } @Test @@ -766,7 +789,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // Initial rebalance where this member becomes the leader herder.tick(); @@ -783,7 +806,9 @@ public class DistributedHerderTest { .when(configBackingStore).putConnectorConfig(eq(CONN2), eq(CONN2_CONFIG), isNull()); // This will occur just before/during the second tick - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); + + List stages = expectRecordStages(putConnectorCallback); herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback); // This tick runs the initial herder request, which issues an asynchronous request for @@ -799,6 +824,14 @@ public class DistributedHerderTest { // Verify that the exception thrown during the config backing store write is propagated via the callback verify(putConnectorCallback).onCompletion(any(ConnectException.class), isNull()); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback); + + assertEquals( + Arrays.asList( + "ensuring membership in the cluster", + "writing a config for connector " + CONN2 + " to the config topic" + ), + stages + ); } @Test @@ -809,7 +842,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); HashMap config = new HashMap<>(CONN2_CONFIG); config.remove(ConnectorConfig.NAME_CONFIG); @@ -822,11 +855,13 @@ public class DistributedHerderTest { return null; }).when(herder).validateConnectorConfig(eq(config), validateCallback.capture()); + List stages = expectRecordStages(putConnectorCallback); + herder.putConnectorConfig(CONN2, config, false, putConnectorCallback); herder.tick(); // We don't need another rebalance to occur - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); herder.tick(); time.sleep(1000L); assertStatistics(3, 1, 100, 1000L); @@ -835,6 +870,15 @@ public class DistributedHerderTest { verify(putConnectorCallback).onCompletion(error.capture(), isNull()); assertTrue(error.getValue() instanceof BadRequestException); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback); + + assertEquals( + Arrays.asList( + "awaiting startup", + "ensuring membership in the cluster", + "reading to the end of the config topic" + ), + stages + ); } @Test @@ -888,7 +932,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // mock the actual validation since its asynchronous nature is difficult to test and should // be covered sufficiently by the unit tests for the AbstractHerder class @@ -898,11 +942,13 @@ public class DistributedHerderTest { return null; }).when(herder).validateConnectorConfig(eq(CONN1_CONFIG), validateCallback.capture()); + List stages = expectRecordStages(putConnectorCallback); + herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback); herder.tick(); // We don't need another rebalance to occur - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); herder.tick(); time.sleep(1000L); assertStatistics(3, 1, 100, 1000L); @@ -910,6 +956,15 @@ public class DistributedHerderTest { // CONN1 already exists verify(putConnectorCallback).onCompletion(any(AlreadyExistsException.class), isNull()); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback); + + assertEquals( + Arrays.asList( + "awaiting startup", + "ensuring membership in the cluster", + "reading to the end of the config topic" + ), + stages + ); } @Test @@ -928,12 +983,14 @@ public class DistributedHerderTest { expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // And delete the connector doNothing().when(configBackingStore).removeConnectorConfig(CONN1); doNothing().when(putConnectorCallback).onCompletion(null, new Herder.Created<>(false, null)); + List stages = expectRecordStages(putConnectorCallback); + herder.deleteConnectorConfig(CONN1, putConnectorCallback); herder.tick(); @@ -961,6 +1018,17 @@ public class DistributedHerderTest { assertStatistics("leaderUrl", true, 3, 1, 100, 2100L); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback); + + assertEquals( + Arrays.asList( + "awaiting startup", + "ensuring membership in the cluster", + "reading to the end of the config topic", + "starting 1 connector(s) and task(s) after a rebalance", + "removing the config for connector sourceA from the config topic" + ), + stages + ); } @Test @@ -973,7 +1041,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -985,7 +1053,7 @@ public class DistributedHerderTest { // Initial rebalance where this member becomes the leader herder.tick(); - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); doNothing().when(worker).stopAndAwaitConnector(CONN1); @@ -1009,12 +1077,12 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the connector restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); FutureCallback callback = new FutureCallback<>(); herder.restartConnector(CONN2, callback); herder.tick(); @@ -1032,12 +1100,12 @@ public class DistributedHerderTest { expectRebalance(1, Collections.emptyList(), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the connector restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); FutureCallback callback = new FutureCallback<>(); herder.restartConnector(CONN1, callback); @@ -1057,12 +1125,12 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the connector restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); String ownerUrl = "ownerUrl"; when(member.ownerUrl(CONN1)).thenReturn(ownerUrl); @@ -1095,12 +1163,12 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the connector restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); herder.tick(); FutureCallback callback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, callback); @@ -1119,12 +1187,12 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the connector restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); herder.tick(); FutureCallback callback = new FutureCallback<>(); @@ -1146,12 +1214,12 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the connector restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(statusBackingStore.get(CONN1)).thenReturn(null); RestartRequest restartRequest = new RestartRequest(CONN1, false, true); doNothing().when(configBackingStore).putRestartRequest(restartRequest); @@ -1175,12 +1243,12 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the connector restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); RestartPlan restartPlan = mock(RestartPlan.class); ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); @@ -1333,14 +1401,14 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), any())).thenReturn(true); herder.tick(); // now handle the task restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); doNothing().when(worker).stopAndAwaitTask(TASK0); FutureCallback callback = new FutureCallback<>(); herder.restartTask(TASK0, callback); @@ -1359,11 +1427,11 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); FutureCallback callback = new FutureCallback<>(); herder.restartTask(new ConnectorTaskId("blah", 0), callback); herder.tick(); @@ -1381,12 +1449,12 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the task restart - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); FutureCallback callback = new FutureCallback<>(); herder.restartTask(TASK0, callback); herder.tick(); @@ -1406,14 +1474,14 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // now handle the task restart String ownerUrl = "ownerUrl"; when(member.ownerUrl(TASK0)).thenReturn(ownerUrl); - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); FutureCallback callback = new FutureCallback<>(); herder.restartTask(TASK0, callback); @@ -1428,10 +1496,13 @@ public class DistributedHerderTest { @Test public void testRequestProcessingOrder() { - final DistributedHerder.DistributedHerderRequest req1 = herder.addRequest(100, null, null); - final DistributedHerder.DistributedHerderRequest req2 = herder.addRequest(10, null, null); - final DistributedHerder.DistributedHerderRequest req3 = herder.addRequest(200, null, null); - final DistributedHerder.DistributedHerderRequest req4 = herder.addRequest(200, null, null); + Callable action = mock(Callable.class); + Callback callback = mock(Callback.class); + + final DistributedHerder.DistributedHerderRequest req1 = herder.addRequest(100, action, callback); + final DistributedHerder.DistributedHerderRequest req2 = herder.addRequest(10, action, callback); + final DistributedHerder.DistributedHerderRequest req3 = herder.addRequest(200, action, callback); + final DistributedHerder.DistributedHerderRequest req4 = herder.addRequest(200, action, callback); assertEquals(req2, herder.requests.pollFirst()); // lowest delay assertEquals(req1, herder.requests.pollFirst()); // next lowest delay @@ -1447,7 +1518,7 @@ public class DistributedHerderTest { // join, no configs so no need to catch up on config topic expectRebalance(-1, Collections.emptyList(), Collections.emptyList()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // join @@ -1486,7 +1557,7 @@ public class DistributedHerderTest { // join expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -1498,7 +1569,7 @@ public class DistributedHerderTest { herder.tick(); // apply config - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(configBackingStore.snapshot()).thenReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot doNothing().when(worker).stopAndAwaitConnector(CONN1); @@ -1521,7 +1592,7 @@ public class DistributedHerderTest { // join expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -1533,7 +1604,7 @@ public class DistributedHerderTest { herder.tick(); // apply config - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); // During the next tick, throw an error from the transformer ClusterConfigState snapshotWithTransform = new ClusterConfigState( 1, @@ -1576,7 +1647,7 @@ public class DistributedHerderTest { // join expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -1588,7 +1659,7 @@ public class DistributedHerderTest { herder.tick(); // join // handle the state change - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(configBackingStore.snapshot()).thenReturn(SNAPSHOT_PAUSED_CONN1); ArgumentCaptor> onPause = ArgumentCaptor.forClass(Callback.class); @@ -1613,7 +1684,7 @@ public class DistributedHerderTest { // start with the connector paused expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -1624,7 +1695,7 @@ public class DistributedHerderTest { herder.tick(); // join // handle the state change - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(configBackingStore.snapshot()).thenReturn(SNAPSHOT); ArgumentCaptor> onResume = ArgumentCaptor.forClass(Callback.class); @@ -1653,7 +1724,7 @@ public class DistributedHerderTest { // join expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -1665,7 +1736,7 @@ public class DistributedHerderTest { herder.tick(); // join // handle the state change - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(configBackingStore.snapshot()).thenReturn(SNAPSHOT_STOPPED_CONN1); ArgumentCaptor> onStop = ArgumentCaptor.forClass(Callback.class); @@ -1690,14 +1761,14 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.emptyList(), singletonList(TASK0)); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); herder.tick(); // join // state change is ignored since we have no target state - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(configBackingStore.snapshot()).thenReturn(SNAPSHOT); configUpdateListener.onConnectorTargetStateChange("unknown-connector"); @@ -1716,14 +1787,14 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); herder.tick(); // join // handle stop request - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); expectConfigRefreshAndSnapshot(SNAPSHOT); doNothing().when(configBackingStore).putTaskConfigs(CONN1, Collections.emptyList()); doNothing().when(configBackingStore).putTargetState(CONN1, TargetState.STOPPED); @@ -1747,14 +1818,14 @@ public class DistributedHerderTest { // join as member (non-leader) expectRebalance(1, Collections.emptyList(), singletonList(TASK0)); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); herder.tick(); // handle stop request - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); FutureCallback cb = new FutureCallback<>(); herder.stopConnector(CONN1, cb); // external request @@ -1781,7 +1852,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); @@ -1789,7 +1860,7 @@ public class DistributedHerderTest { ConnectException taskConfigsWriteException = new ConnectException("Could not write task configs to config topic"); // handle stop request - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); doThrow(taskConfigsWriteException).when(configBackingStore).putTaskConfigs(CONN1, Collections.emptyList()); // We do not expect configBackingStore::putTargetState to be invoked, which // is intentional since that call should only take place if we are first able to @@ -1822,14 +1893,14 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.emptyList(), singletonList(TASK0)); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); herder.tick(); // join // handle the state change - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(configBackingStore.snapshot()).thenReturn(SNAPSHOT_PAUSED_CONN1); ArgumentCaptor> onPause = ArgumentCaptor.forClass(Callback.class); @@ -1856,14 +1927,14 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.emptyList(), singletonList(TASK0)); expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.PAUSED))).thenReturn(true); herder.tick(); // join // handle the state change - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(configBackingStore.snapshot()).thenReturn(SNAPSHOT); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -1889,7 +1960,7 @@ public class DistributedHerderTest { // join expectRebalance(-1, Collections.emptyList(), Collections.emptyList()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // join @@ -1953,7 +2024,7 @@ public class DistributedHerderTest { expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); when(worker.startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds()); @@ -1982,7 +2053,7 @@ public class DistributedHerderTest { expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -2056,7 +2127,7 @@ public class DistributedHerderTest { expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -2135,7 +2206,7 @@ public class DistributedHerderTest { expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); WorkerConfigTransformer configTransformer = mock(WorkerConfigTransformer.class); @@ -2207,7 +2278,7 @@ public class DistributedHerderTest { }).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG), any(), eq(herder), eq(TargetState.STARTED), onStart.capture()); expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // Should pick up original config FutureCallback> connectorConfigCb = new FutureCallback<>(); @@ -2217,7 +2288,7 @@ public class DistributedHerderTest { assertEquals(CONN1_CONFIG, connectorConfigCb.get()); // Poll loop for second round of calls - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); ArgumentCaptor> validateCallback = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -2273,12 +2344,12 @@ public class DistributedHerderTest { expectRebalance(1, Collections.emptyList(), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play - verify(member).poll(eq(Long.MAX_VALUE)); + verify(member).poll(eq(Long.MAX_VALUE), any()); expectRebalance(2, Collections.emptyList(), Collections.emptyList()); SessionKey initialKey = new SessionKey(mock(SecretKey.class), 0); @@ -2299,7 +2370,7 @@ public class DistributedHerderTest { herder.tick(); // Second rebalance: poll indefinitely as worker is follower, so expiration still doesn't come into play - verify(member, times(2)).poll(eq(Long.MAX_VALUE)); + verify(member, times(2)).poll(eq(Long.MAX_VALUE), any()); expectRebalance(2, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL, true); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); @@ -2312,7 +2383,7 @@ public class DistributedHerderTest { herder.tick(); // Third rebalance: poll for a limited time as worker has become leader and must wake up for key expiration - verify(member).poll(eq(rotationTtlDelay)); + verify(member).poll(eq(rotationTtlDelay), any()); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } @@ -2340,19 +2411,19 @@ public class DistributedHerderTest { Collections.emptySet(), Collections.emptySet()); expectConfigRefreshAndSnapshot(snapshotWithKey); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); configUpdateListener.onSessionKeyUpdate(initialKey); herder.tick(); // First rebalance: poll for a limited time as worker is leader and must wake up for key expiration - verify(member).poll(leq(rotationTtlDelay)); + verify(member).poll(leq(rotationTtlDelay), any()); expectRebalance(1, Collections.emptyList(), Collections.emptyList()); herder.tick(); // Second rebalance: poll indefinitely as worker is no longer leader, so key expiration doesn't come into play - verify(member).poll(eq(Long.MAX_VALUE)); + verify(member).poll(eq(Long.MAX_VALUE), any()); verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } @@ -2361,11 +2432,16 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); Callback taskConfigCb = mock(Callback.class); + List stages = expectRecordStages(taskConfigCb); herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, null); // Expect a wakeup call after the request to write task configs is added to the herder's request queue verify(member).wakeup(); verifyNoMoreInteractions(member, taskConfigCb); + assertEquals( + Arrays.asList("awaiting startup"), + stages + ); } @Test @@ -2373,11 +2449,16 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); Callback taskConfigCb = mock(Callback.class); + List stages = expectRecordStages(taskConfigCb); herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, null); // Expect a wakeup call after the request to write task configs is added to the herder's request queue verify(member).wakeup(); verifyNoMoreInteractions(member, taskConfigCb); + assertEquals( + Arrays.asList("awaiting startup"), + stages + ); } @Test @@ -2473,11 +2554,17 @@ public class DistributedHerderTest { configUpdateListener.onSessionKeyUpdate(sessionKey); Callback taskConfigCb = mock(Callback.class); + List stages = expectRecordStages(taskConfigCb); herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature); // Expect a wakeup call after the request to write task configs is added to the herder's request queue verify(member).wakeup(); verifyNoMoreInteractions(member, taskConfigCb); + + assertEquals( + Arrays.asList("awaiting startup"), + stages + ); } @Test @@ -2502,8 +2589,8 @@ public class DistributedHerderTest { herder.tick(); - verify(member, times(2)).ensureActive(); - verify(member, times(1)).poll(anyLong()); + verify(member, times(2)).ensureActive(any()); + verify(member, times(1)).poll(anyLong(), any()); verify(configBackingStore, times(2)).putSessionKey(any(SessionKey.class)); } @@ -2551,8 +2638,8 @@ public class DistributedHerderTest { herder.tick(); - verify(member, times(2)).ensureActive(); - verify(member, times(1)).poll(anyLong()); + verify(member, times(2)).ensureActive(any()); + verify(member, times(1)).poll(anyLong(), any()); verify(configBackingStore, times(1)).putSessionKey(any(SessionKey.class)); } @@ -2606,7 +2693,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); expectRebalance(1, Collections.emptyList(), Collections.emptyList()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); OngoingStubbing> expectRequest = when(restClient.httpRequest( any(), eq("PUT"), isNull(), isNull(), isNull(), any(), any() @@ -2695,7 +2782,7 @@ public class DistributedHerderTest { expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); if (expectTaskCountRecord) { doNothing().when(configBackingStore).putTaskCountRecord(CONN1, 1); @@ -2739,7 +2826,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(configState); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // The future returned by Worker::fenceZombies KafkaFuture workerFencingFuture = mock(KafkaFuture.class); @@ -2798,7 +2885,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(configState); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); Exception fencingException = new KafkaException("whoops!"); when(worker.fenceZombies(eq(CONN1), eq(2), eq(CONN1_CONFIG))).thenThrow(fencingException); @@ -2841,7 +2928,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(configState); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // The future returned by Worker::fenceZombies KafkaFuture workerFencingFuture = mock(KafkaFuture.class); @@ -2920,7 +3007,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(configState); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); // The callbacks that the herder has accrued for outstanding fencing futures, which will be completed after // a successful round of fencing and a task record write to the config topic @@ -3146,7 +3233,7 @@ public class DistributedHerderTest { // We should poll for less than the delay - time to start the connector, meaning that a long connector start // does not delay the poll timeout - verify(member, times(3)).poll(leq(maxPollWaitMs)); + verify(member, times(3)).poll(leq(maxPollWaitMs), any()); verify(worker, times(2)).startConnector(eq(CONN1), any(), any(), eq(herder), eq(TargetState.STARTED), any()); verifyNoMoreInteractions(member, worker, configBackingStore); } @@ -3228,13 +3315,13 @@ public class DistributedHerderTest { // 1. end of initial tick when no request has been added to the herder queue yet // 2. the third task reconfiguration request is expected to pass; so expect no more retries (a Long.MAX_VALUE poll // timeout indicates that there is no herder request currently in the queue) - verify(member, times(2)).poll(eq(Long.MAX_VALUE)); + verify(member, times(2)).poll(eq(Long.MAX_VALUE), any()); // task reconfiguration herder request with initial retry backoff - verify(member).poll(eq(250L)); + verify(member).poll(eq(250L), any()); // task reconfiguration herder request with double the initial retry backoff - verify(member).poll(eq(500L)); + verify(member).poll(eq(500L), any()); verifyNoMoreInteractions(member, worker, restClient); } @@ -3492,7 +3579,7 @@ public class DistributedHerderTest { expectConfigRefreshAndSnapshot(SNAPSHOT); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); @@ -3633,12 +3720,12 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // Now handle the alter connector offsets request - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(herder.connectorType(any())).thenReturn(ConnectorType.SOURCE); // Expect a round of zombie fencing to occur @@ -3699,12 +3786,12 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - doNothing().when(member).poll(anyLong()); + expectMemberPoll(); herder.tick(); // Now handle the reset connector offsets request - doNothing().when(member).ensureActive(); + expectMemberEnsureActive(); when(herder.connectorType(any())).thenReturn(ConnectorType.SOURCE); // Expect a round of zombie fencing to occur @@ -3735,6 +3822,22 @@ public class DistributedHerderTest { verifyNoMoreInteractions(workerFencingFuture, herderFencingFuture, member, worker); } + private void expectMemberPoll() { + ArgumentCaptor> onPoll = ArgumentCaptor.forClass(Supplier.class); + doAnswer(invocation -> { + onPoll.getValue().get().close(); + return null; + }).when(member).poll(anyLong(), onPoll.capture()); + } + + private void expectMemberEnsureActive() { + ArgumentCaptor> onPoll = ArgumentCaptor.forClass(Supplier.class); + doAnswer(invocation -> { + onPoll.getValue().get().close(); + return null; + }).when(member).ensureActive(onPoll.capture()); + } + private void expectRebalance(final long offset, final List assignedConnectors, final List assignedTasks) { @@ -3788,7 +3891,9 @@ public class DistributedHerderTest { final List assignedTasks, int delay, boolean isLeader) { + ArgumentCaptor> onPoll = ArgumentCaptor.forClass(Supplier.class); doAnswer(invocation -> { + onPoll.getValue().get().close(); ExtendedAssignment assignment; if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) { rebalanceListener.onRevoked(leader, revokedConnectors, revokedTasks); @@ -3808,7 +3913,7 @@ public class DistributedHerderTest { rebalanceListener.onAssigned(assignment, 3); time.sleep(100L); return null; - }).when(member).ensureActive(); + }).when(member).ensureActive(onPoll.capture()); if (isLeader) { doNothing().when(configBackingStore).claimWritePrivileges(); @@ -3961,6 +4066,20 @@ public class DistributedHerderTest { } } + private static List expectRecordStages(Callback callback) { + when(callback.chainStaging(any())).thenCallRealMethod(); + List result = Collections.synchronizedList(new ArrayList<>()); + + doAnswer(invocation -> { + Stage stage = invocation.getArgument(0); + if (stage != null) + result.add(stage.description()); + return null; + }).when(callback).recordStage(any()); + + return result; + } + // We need to use a real class here due to some issue with mocking java.lang.Class private abstract class BogusSourceConnector extends SourceConnector { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java index bf3b7361b55..e2daad66fca 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.kafka.test.TestUtils.waitForCondition; @@ -302,20 +303,16 @@ public class ConnectAssertions { */ public void assertConnectorAndAtLeastNumTasksAreRunning(String connectorName, int numTasks, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkConnectorState( - connectorName, - AbstractStatus.State.RUNNING, - numTasks, - AbstractStatus.State.RUNNING, - (actual, expected) -> actual >= expected - ).orElse(false), - CONNECTOR_SETUP_DURATION_MS, - "The connector or at least " + numTasks + " of tasks are not running."); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } + waitForConnectorState( + connectorName, + AbstractStatus.State.RUNNING, + atLeast(numTasks), + null, + AbstractStatus.State.RUNNING, + "The connector or at least " + numTasks + " of tasks are not running.", + detailMessage, + CONNECTOR_SETUP_DURATION_MS + ); } /** @@ -329,20 +326,16 @@ public class ConnectAssertions { */ public void assertConnectorAndExactlyNumTasksAreRunning(String connectorName, int numTasks, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkConnectorState( - connectorName, - AbstractStatus.State.RUNNING, - numTasks, - AbstractStatus.State.RUNNING, - (actual, expected) -> actual == expected - ).orElse(false), - CONNECTOR_SETUP_DURATION_MS, - "The connector or exactly " + numTasks + " tasks are not running."); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } + waitForConnectorState( + connectorName, + AbstractStatus.State.RUNNING, + exactly(numTasks), + null, + AbstractStatus.State.RUNNING, + "The connector or exactly " + numTasks + " tasks are not running.", + detailMessage, + CONNECTOR_SETUP_DURATION_MS + ); } /** @@ -356,20 +349,16 @@ public class ConnectAssertions { */ public void assertConnectorAndExactlyNumTasksArePaused(String connectorName, int numTasks, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkConnectorState( - connectorName, - AbstractStatus.State.PAUSED, - numTasks, - AbstractStatus.State.PAUSED, - Integer::equals - ).orElse(false), - CONNECTOR_SHUTDOWN_DURATION_MS, - "The connector or exactly " + numTasks + " tasks are not paused."); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } + waitForConnectorState( + connectorName, + AbstractStatus.State.PAUSED, + exactly(numTasks), + null, + AbstractStatus.State.PAUSED, + "The connector or exactly " + numTasks + " tasks are not paused.", + detailMessage, + CONNECTOR_SHUTDOWN_DURATION_MS + ); } /** @@ -383,24 +372,20 @@ public class ConnectAssertions { */ public void assertConnectorIsRunningAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkConnectorState( - connectorName, - AbstractStatus.State.RUNNING, - numTasks, - AbstractStatus.State.FAILED, - (actual, expected) -> actual >= expected - ).orElse(false), - CONNECTOR_SETUP_DURATION_MS, - "Either the connector is not running or not all the " + numTasks + " tasks have failed."); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } + waitForConnectorState( + connectorName, + AbstractStatus.State.RUNNING, + exactly(numTasks), + null, + AbstractStatus.State.FAILED, + "Either the connector is not running or not all the " + numTasks + " tasks have failed.", + detailMessage, + CONNECTOR_SETUP_DURATION_MS + ); } /** - * Assert that a connector is running, that it has a specific number of tasks out of that numFailedTasks are in the FAILED state. + * Assert that a connector is running, that it has a specific number of tasks, and out of those, numFailedTasks are in the FAILED state. * * @param connectorName the connector name * @param numTasks the number of tasks @@ -410,21 +395,16 @@ public class ConnectAssertions { */ public void assertConnectorIsRunningAndNumTasksHaveFailed(String connectorName, int numTasks, int numFailedTasks, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkConnectorState( - connectorName, - AbstractStatus.State.RUNNING, - numTasks, - numFailedTasks, - AbstractStatus.State.FAILED, - (actual, expected) -> actual >= expected - ).orElse(false), - CONNECTOR_SETUP_DURATION_MS, - "Either the connector is not running or not all the " + numTasks + " tasks have failed."); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } + waitForConnectorState( + connectorName, + AbstractStatus.State.RUNNING, + exactly(numTasks), + numFailedTasks, + AbstractStatus.State.FAILED, + "Either the connector is not running or not all the " + numTasks + " tasks have failed.", + detailMessage, + CONNECTOR_SETUP_DURATION_MS + ); } /** @@ -438,20 +418,16 @@ public class ConnectAssertions { */ public void assertConnectorIsFailedAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkConnectorState( - connectorName, - AbstractStatus.State.FAILED, - numTasks, - AbstractStatus.State.FAILED, - (actual, expected) -> actual >= expected - ).orElse(false), - CONNECTOR_SETUP_DURATION_MS, - "Either the connector is running or not all the " + numTasks + " tasks have failed."); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } + waitForConnectorState( + connectorName, + AbstractStatus.State.FAILED, + exactly(numTasks), + null, + AbstractStatus.State.FAILED, + "Either the connector is running or not all the " + numTasks + " tasks have failed.", + detailMessage, + CONNECTOR_SETUP_DURATION_MS + ); } /** @@ -500,82 +476,84 @@ public class ConnectAssertions { */ public void assertConnectorIsStopped(String connectorName, String detailMessage) throws InterruptedException { + waitForConnectorState( + connectorName, + AbstractStatus.State.STOPPED, + exactly(0), + null, + null, + "At least the connector or one of its tasks is still running", + detailMessage, + CONNECTOR_SHUTDOWN_DURATION_MS + ); + } + + /** + * Check whether the given connector state matches the current state of the connector and + * whether it has at least the given number of tasks, with some number of tasks matching the given + * task state. + * @param connectorName the connector + * @param connectorState + * @param expectedNumTasks the expected number of tasks + * @param tasksState + */ + protected void waitForConnectorState( + String connectorName, + AbstractStatus.State connectorState, + Predicate expectedNumTasks, + Integer numTasksInTasksState, + AbstractStatus.State tasksState, + String conditionMessage, + String detailMessage, + long maxWaitMs + ) throws InterruptedException { + AtomicReference lastInfo = new AtomicReference<>(); + AtomicReference lastInfoError = new AtomicReference<>(); try { waitForCondition( - () -> checkConnectorState( - connectorName, - AbstractStatus.State.STOPPED, - 0, - null, - Integer::equals - ).orElse(false), - CONNECTOR_SHUTDOWN_DURATION_MS, - "At least the connector or one of its tasks is still running"); + () -> { + try { + ConnectorStateInfo info = connect.connectorStatus(connectorName); + lastInfo.set(info); + lastInfoError.set(null); + + if (info == null) + return false; + + int numTasks = info.tasks().size(); + int expectedTasksInState = Optional.ofNullable(numTasksInTasksState).orElse(numTasks); + return expectedNumTasks.test(info.tasks().size()) + && info.connector().state().equals(connectorState.toString()) + && info.tasks().stream().filter(s -> s.state().equals(tasksState.toString())).count() == expectedTasksInState; + } catch (Exception e) { + log.error("Could not check connector state info.", e); + lastInfo.set(null); + lastInfoError.set(e); + return false; + } + }, + maxWaitMs, + () -> { + String result = conditionMessage; + if (lastInfo.get() != null) { + return result + " When last checked, " + stateSummary(lastInfo.get()); + } else if (lastInfoError.get() != null) { + result += " The last attempt to check the connector state failed: " + lastInfoError.get().getClass(); + String exceptionMessage = lastInfoError.get().getMessage(); + if (exceptionMessage != null) { + result += ": " + exceptionMessage; + } + return result; + } else { + return result; + } + } + ); } catch (AssertionError e) { throw new AssertionError(detailMessage, e); } } - /** - * Check whether the given connector state matches the current state of the connector and - * whether it has at least the given number of tasks, with all the tasks matching the given - * task state. - * @param connectorName the connector - * @param connectorState - * @param numTasks the expected number of tasks - * @param tasksState - * @return true if the connector and tasks are in the expected state; false otherwise - */ - protected Optional checkConnectorState( - String connectorName, - AbstractStatus.State connectorState, - int numTasks, - AbstractStatus.State tasksState, - BiFunction comp - ) { - try { - ConnectorStateInfo info = connect.connectorStatus(connectorName); - boolean result = info != null - && comp.apply(info.tasks().size(), numTasks) - && info.connector().state().equals(connectorState.toString()) - && info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString())); - return Optional.of(result); - } catch (Exception e) { - log.error("Could not check connector state info.", e); - return Optional.empty(); - } - } - - /** - * Check whether the given connector state matches the current state of the connector and - * whether it has at least the given number of tasks, with numTasksInTasksState matching the given - * task state. - * @param connectorName the connector - * @param connectorState - * @param numTasks the expected number of tasks - * @param tasksState - * @return true if the connector and tasks are in the expected state; false otherwise - */ - protected Optional checkConnectorState( - String connectorName, - AbstractStatus.State connectorState, - int numTasks, - int numTasksInTasksState, - AbstractStatus.State tasksState, - BiFunction comp - ) { - try { - ConnectorStateInfo info = connect.connectorStatus(connectorName); - boolean result = info != null - && comp.apply(info.tasks().size(), numTasks) - && info.connector().state().equals(connectorState.toString()) - && info.tasks().stream().filter(s -> s.state().equals(tasksState.toString())).count() == numTasksInTasksState; - return Optional.of(result); - } catch (Exception e) { - log.error("Could not check connector state info.", e); - return Optional.empty(); - } - } /** * Assert that a connector's set of active topics matches the given collection of topic names. * @@ -615,4 +593,24 @@ public class ConnectAssertions { return Optional.empty(); } } + + private static String stateSummary(ConnectorStateInfo info) { + String result = "the connector was " + info.connector().state(); + if (info.tasks().isEmpty()) { + return result + " and no tasks were running"; + } else { + String taskStates = info.tasks().stream() + .map(ConnectorStateInfo.TaskState::state) + .collect(Collectors.joining(", ")); + return result + " and its tasks were in these states: " + taskStates; + } + } + + private static Predicate exactly(int expected) { + return actual -> actual == expected; + } + + private static Predicate atLeast(int expected) { + return actual -> actual >= expected; + } }