diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ae13b9bb8a1..185c8dd283d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -169,7 +169,7 @@ files="(RequestResponse|WorkerSinkTask)Test.java"/> + files="(DistributedHerder|Worker)Test.java"/> + * Note that altering / resetting offsets is expected to be an idempotent operation and this method should be able + * to handle being called more than once with the same arguments (which could occur if a user retries the request + * due to a failure in altering the consumer group offsets, for example). + *

* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the * {@link #start(Map) start} method is invoked. * * @param connectorConfig the configuration of the connector * @param offsets a map from topic partition to offset, containing the offsets that the user has requested to * alter/reset. For any topic partitions whose offsets are being reset instead of altered, their - * corresponding value in the map will be {@code null}. + * corresponding value in the map will be {@code null}. This map may be empty, but never null. An + * empty offsets map could indicate that the offsets were reset previously or that no offsets have + * been committed yet. * @return whether this method has been overridden by the connector; the default implementation returns * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return * {@code true} * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be * reset for any other reason (for example, they have failed custom validation logic specific to this connector) + * @since 3.6 */ public boolean alterOffsets(Map connectorConfig, Map offsets) { return false; diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java index eaaf56566c8..691476f77b0 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java @@ -85,19 +85,26 @@ public abstract class SourceConnector extends Connector { * returned by any {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances} * provided to this connector and its tasks. *

+ * Note that altering / resetting offsets is expected to be an idempotent operation and this method should be able + * to handle being called more than once with the same arguments (which could occur if a user retries the request + * due to a failure in writing the new offsets to the offsets store, for example). + *

* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the * {@link #start(Map) start} method is invoked. * * @param connectorConfig the configuration of the connector * @param offsets a map from source partition to source offset, containing the offsets that the user has requested * to alter/reset. For any source partitions whose offsets are being reset instead of altered, their - * corresponding source offset value in the map will be {@code null} + * corresponding source offset value in the map will be {@code null}. This map may be empty, but + * never null. An empty offsets map could indicate that the offsets were reset previously or that no + * offsets have been committed yet. * @return whether this method has been overridden by the connector; the default implementation returns * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return * {@code true} * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be * reset for any other reason (for example, they have failed custom validation logic specific to this connector) + * @since 3.6 */ public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { return false; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 84bd0f81108..b1a19fd46ae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.kafka.connect.runtime.rest.entities.Message; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; @@ -902,4 +903,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con cb.onCompletion(t, null); } } + + @Override + public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { + if (offsets == null || offsets.isEmpty()) { + callback.onCompletion(new ConnectException("The offsets to be altered may not be null or empty"), null); + return; + } + modifyConnectorOffsets(connName, offsets, callback); + } + + @Override + public void resetConnectorOffsets(String connName, Callback callback) { + modifyConnectorOffsets(connName, null, callback); + } + + /** + * Service external requests to alter or reset connector offsets. + * @param connName the name of the connector whose offsets are to be modified + * @param offsets the offsets to be written; this should be {@code null} for offsets reset requests + * @param cb callback to invoke upon completion + */ + protected abstract void modifyConnectorOffsets(String connName, Map, Map> offsets, Callback cb); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 7d00dba3d35..a6f9adfced2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -311,6 +311,13 @@ public interface Herder { */ void alterConnectorOffsets(String connName, Map, Map> offsets, Callback cb); + /** + * Reset a connector's offsets. + * @param connName the name of the connector whose offsets are to be reset + * @param cb callback to invoke upon completion + */ + void resetConnectorOffsets(String connName, Callback cb); + enum ConfigReloadAction { NONE, RESTART diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index f52446f234e..bda9ecbaa85 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions; import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; @@ -38,10 +39,13 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.apache.kafka.common.utils.ThreadUtils; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.GroupSubscribedToTopicException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; @@ -92,6 +96,7 @@ import org.apache.kafka.connect.util.TopicCreationGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1201,6 +1206,7 @@ public class Worker { }); } catch (Throwable t) { Utils.closeQuietly(admin, "Offset fetch admin for sink connector " + connName); + log.error("Failed to retrieve consumer group offsets for sink connector {}", connName, t); cb.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, t), null); } } @@ -1236,7 +1242,8 @@ public class Worker { .collect(Collectors.toList()); cb.onCompletion(null, new ConnectorOffsets(connectorOffsets)); } catch (Throwable t) { - cb.onCompletion(t, null); + log.error("Failed to retrieve offsets for source connector {}", connName, t); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to retrieve offsets for source connector " + connName), null); } finally { Utils.closeQuietly(offsetReader, "Offset reader for connector " + connName); Utils.closeQuietly(offsetStore::stop, "Offset store for connector " + connName); @@ -1245,21 +1252,17 @@ public class Worker { } /** - * Alter a connector's offsets. + * Modify (alter / reset) a connector's offsets. * - * @param connName the name of the connector whose offsets are to be altered + * @param connName the name of the connector whose offsets are to be modified * @param connectorConfig the connector's configurations * @param offsets a mapping from partitions (either source partitions for source connectors, or Kafka topic - * partitions for sink connectors) to offsets that need to be written; may not be null or empty + * partitions for sink connectors) to offsets that need to be written; this should be {@code null} + * for offsets reset requests * @param cb callback to invoke upon completion */ - public void alterConnectorOffsets(String connName, Map connectorConfig, + public void modifyConnectorOffsets(String connName, Map connectorConfig, Map, Map> offsets, Callback cb) { - - if (offsets == null || offsets.isEmpty()) { - throw new ConnectException("The offsets to be altered may not be null or empty"); - } - String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); Connector connector; @@ -1267,40 +1270,34 @@ public class Worker { try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { connector = plugins.newConnector(connectorClassOrAlias); if (ConnectUtils.isSinkConnector(connector)) { - log.debug("Altering consumer group offsets for sink connector: {}", connName); - alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + log.debug("Modifying offsets for sink connector: {}", connName); + modifySinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); } else { - log.debug("Altering offsets for source connector: {}", connName); - alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + log.debug("Modifying offsets for source connector: {}", connName); + modifySourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); } } } /** - * Alter a sink connector's consumer group offsets. + * Modify (alter / reset) a sink connector's consumer group offsets. *

* Visible for testing. * - * @param connName the name of the sink connector whose offsets are to be altered + * @param connName the name of the sink connector whose offsets are to be modified * @param connector an instance of the sink connector * @param connectorConfig the sink connector's configuration - * @param offsets a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param offsets a mapping from topic partitions to offsets that need to be written; this should be {@code null} + * for offsets reset requests * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader * @param cb callback to invoke upon completion */ - void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, - Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { + void modifySinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { executor.submit(plugins.withClassLoader(connectorLoader, () -> { try { - Map parsedOffsets = SinkUtils.parseSinkConnectorOffsets(offsets); - boolean alterOffsetsResult; - try { - alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); - } catch (UnsupportedOperationException e) { - throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + - "modification of offsets", e); - } - + Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS)); + boolean isReset = offsets == null; SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); Class sinkConnectorClass = connector.getClass(); Map adminConfig = adminConfigs( @@ -1320,89 +1317,192 @@ public class Worker { Admin admin = adminFactory.apply(adminConfig); try { - List> adminFutures = new ArrayList<>(); + Map offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) timer.remainingMs()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(timer.remainingMs(), TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); - Map offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); - } - - Set partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); - - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); - } - - @SuppressWarnings("rawtypes") - KafkaFuture compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); - - compositeAdminFuture.whenComplete((ignored, error) -> { - if (error != null) { - // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client - // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped - // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to - // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException - if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + - "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + - "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + - "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), - null); - } else { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); - } - } else { - completeAlterOffsetsCallback(alterOffsetsResult, cb); + timer.update(); + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting offsets for sink connector {}", connName, e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting offsets for sink connector " + connName, e), null); + return; } - }).whenComplete((ignored, ignoredError) -> { - // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw - // an exception itself, and we can thus ignore the error here - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); - }); + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); + } + + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets", + connName, e); + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + updateTimerAndCheckExpiry(timer, "Timed out while calling the 'alterOffsets' method for sink connector " + connName); + + if (isReset) { + resetSinkConnectorOffsets(connName, groupId, admin, cb, alterOffsetsResult, timer); + } else { + alterSinkConnectorOffsets(connName, groupId, admin, offsetsToWrite, cb, alterOffsetsResult, timer); + } } catch (Throwable t) { - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + Utils.closeQuietly(admin, "Offset modification admin for sink connector " + connName); throw t; } } catch (Throwable t) { - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + log.error("Failed to modify offsets for sink connector {}", connName, t); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for sink connector " + connName), null); } })); } /** - * Alter a source connector's offsets. + * Alter a sink connector's consumer group offsets. This is done via calls to {@link Admin#alterConsumerGroupOffsets} + * and / or {@link Admin#deleteConsumerGroupOffsets}. * - * @param connName the name of the source connector whose offsets are to be altered + * @param connName the name of the sink connector whose offsets are to be altered + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; will be closed after use + * @param offsetsToWrite a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param cb callback to invoke upon completion + * @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} for the connector + * @param timer {@link Timer} to bound the total runtime of admin client requests + */ + private void alterSinkConnectorOffsets(String connName, String groupId, Admin admin, Map offsetsToWrite, + Callback cb, boolean alterOffsetsResult, Timer timer) { + List> adminFutures = new ArrayList<>(); + + Map offsetsToAlter = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions() + .timeoutMs((int) timer.remainingMs()); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFutures.add(alterConsumerGroupOffsetsResult.all()); + } + + Set partitionsToReset = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions() + .timeoutMs((int) timer.remainingMs()); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + + adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + } + + @SuppressWarnings("rawtypes") + KafkaFuture compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); + + compositeAdminFuture.whenComplete((ignored, error) -> { + if (error != null) { + // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client + // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to + // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException + if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { + String errorMsg = "Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."; + log.error(errorMsg, error); + cb.onCompletion(new ConnectException(errorMsg, error), null); + } else { + log.error("Failed to alter consumer group offsets for connector {}", connName, error); + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); + } + } else { + completeModifyOffsetsCallback(alterOffsetsResult, false, cb); + } + }).whenComplete((ignored, ignoredError) -> { + // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw + // an exception itself, and we can thus ignore the error here + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + }); + } + + /** + * Reset a sink connector's consumer group offsets. This is done by deleting the consumer group via a call to + * {@link Admin#deleteConsumerGroups} + * + * @param connName the name of the sink connector whose offsets are to be reset + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for resetting the consumer group offsets; will be closed after use + * @param cb callback to invoke upon completion + * @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} for the connector + * @param timer {@link Timer} to bound the total runtime of admin client requests + */ + private void resetSinkConnectorOffsets(String connName, String groupId, Admin admin, Callback cb, boolean alterOffsetsResult, Timer timer) { + DeleteConsumerGroupsOptions deleteConsumerGroupsOptions = new DeleteConsumerGroupsOptions().timeoutMs((int) timer.remainingMs()); + + admin.deleteConsumerGroups(Collections.singleton(groupId), deleteConsumerGroupsOptions) + .all() + .whenComplete((ignored, error) -> { + // We treat GroupIdNotFoundException as a non-error here because resetting a connector's offsets is expected to be an idempotent operation + // and the consumer group could have already been deleted in a prior offsets reset request + if (error != null && !(error instanceof GroupIdNotFoundException)) { + // When a consumer group is non-empty, attempts to delete it via the admin client result in a GroupNotEmptyException. This can occur + // if the sink tasks haven't stopped completely or if the connector is resumed while the reset offsets request is being processed + if (error instanceof GroupNotEmptyException) { + String errorMsg = "Failed to reset consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to reset its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."; + log.error(errorMsg, error); + cb.onCompletion(new ConnectException(errorMsg, error), null); + } else { + log.error("Failed to reset consumer group offsets for sink connector {}", connName, error); + cb.onCompletion(new ConnectException("Failed to reset consumer group offsets for sink connector " + connName, error), null); + } + } else { + completeModifyOffsetsCallback(alterOffsetsResult, true, cb); + } + }).whenComplete((ignored, ignoredError) -> { + // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw + // an exception itself, and we can thus ignore the error here + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + }); + } + + /** + * Modify (alter / reset) a source connector's offsets. + * + * @param connName the name of the source connector whose offsets are to be modified * @param connector an instance of the source connector * @param connectorConfig the source connector's configuration - * @param offsets a mapping from partitions to offsets that need to be written; may not be null or empty + * @param offsets a mapping from partitions to offsets that need to be written; this should be {@code null} for + * offsets reset requests * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader * @param cb callback to invoke upon completion */ - private void alterSourceConnectorOffsets(String connName, Connector connector, Map connectorConfig, - Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { + private void modifySourceConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable()); Map producerProps = config.exactlyOnceSourceEnabled() ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig, @@ -1417,29 +1517,60 @@ public class Worker { offsetStore.configure(config); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter); - alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb); + modifySourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb); } // Visible for testing - void alterSourceConnectorOffsets(String connName, Connector connector, Map connectorConfig, - Map, Map> offsets, ConnectorOffsetBackingStore offsetStore, - KafkaProducer producer, OffsetStorageWriter offsetWriter, - ClassLoader connectorLoader, Callback cb) { + void modifySourceConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ConnectorOffsetBackingStore offsetStore, + KafkaProducer producer, OffsetStorageWriter offsetWriter, + ClassLoader connectorLoader, Callback cb) { executor.submit(plugins.withClassLoader(connectorLoader, () -> { try { - boolean alterOffsetsResult; - try { - alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets); - } catch (UnsupportedOperationException e) { - throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + - "modification of offsets", e); - } + Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS)); // This reads to the end of the offsets topic and can be a potentially time-consuming operation offsetStore.start(); + updateTimerAndCheckExpiry(timer, "Timed out while trying to read to the end of the offsets topic prior to modifying " + + "offsets for source connector " + connName); + Map, Map> offsetsToWrite; - // The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's + // If the offsets argument is null, it indicates an offsets reset operation - i.e. a null offset should + // be written for every source partition of the connector + boolean isReset; + if (offsets == null) { + isReset = true; + offsetsToWrite = new HashMap<>(); + offsetStore.connectorPartitions(connName).forEach(partition -> offsetsToWrite.put(partition, null)); + log.debug("Found the following partitions (to reset offsets) for source connector {}: {}", connName, offsetsToWrite.keySet()); + } else { + isReset = false; + offsetsToWrite = offsets; + } + + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets", + connName, e); + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + updateTimerAndCheckExpiry(timer, "Timed out while calling the 'alterOffsets' method for source connector " + connName); + + // This should only occur for an offsets reset request when there are no source partitions found for the source connector in the + // offset store - either because there was a prior attempt to reset offsets or if there are no offsets committed by this source + // connector so far + if (offsetsToWrite.isEmpty()) { + log.info("No offsets found for source connector {} - this can occur due to a prior attempt to reset offsets or if the " + + "source connector hasn't committed any offsets yet", connName); + completeModifyOffsetsCallback(alterOffsetsResult, isReset, cb); + return; + } + + // The modifySourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's // safe to write offsets via an offset writer - offsets.forEach(offsetWriter::offset); + offsetsToWrite.forEach(offsetWriter::offset); // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed, @@ -1450,7 +1581,7 @@ public class Worker { producer.initTransactions(); producer.beginTransaction(); } - log.debug("Committing the following offsets for source connector {}: {}", connName, offsets); + log.debug("Committing the following offsets for source connector {}: {}", connName, offsetsToWrite); FutureCallback offsetWriterCallback = new FutureCallback<>(); offsetWriter.doFlush(offsetWriterCallback); if (config.exactlyOnceSourceEnabled()) { @@ -1458,32 +1589,58 @@ public class Worker { } try { - offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + offsetWriterCallback.get(timer.remainingMs(), TimeUnit.MILLISECONDS); } catch (ExecutionException e) { - throw new ConnectException("Failed to alter offsets for source connector " + connName, e.getCause()); + throw new ConnectException("Failed to modify offsets for source connector " + connName, e.getCause()); } catch (TimeoutException e) { - throw new ConnectException("Timed out while attempting to alter offsets for source connector " + connName, e); + throw new ConnectException("Timed out while attempting to modify offsets for source connector " + connName, e); } catch (InterruptedException e) { - throw new ConnectException("Unexpectedly interrupted while attempting to alter offsets for source connector " + connName, e); + throw new ConnectException("Unexpectedly interrupted while attempting to modify offsets for source connector " + connName, e); } - completeAlterOffsetsCallback(alterOffsetsResult, cb); + completeModifyOffsetsCallback(alterOffsetsResult, isReset, cb); } catch (Throwable t) { - log.error("Failed to alter offsets for source connector {}", connName, t); - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for source connector " + connName), null); + log.error("Failed to modify offsets for source connector {}", connName, t); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for source connector " + connName), null); } finally { - Utils.closeQuietly(offsetStore::stop, "Offset store for offset alter request for connector " + connName); + Utils.closeQuietly(offsetStore::stop, "Offset store for offset modification request for connector " + connName); } })); } - private void completeAlterOffsetsCallback(boolean alterOffsetsResult, Callback cb) { + /** + * Update the provided timer, check if it's expired and throw a {@link ConnectException} with the provided error + * message if it is. + * + * @param timer {@link Timer} to check + * @param errorMessageIfExpired error message indicating the cause for the timer expiry + * @throws ConnectException if the timer has expired + */ + private void updateTimerAndCheckExpiry(Timer timer, String errorMessageIfExpired) { + timer.update(); + if (timer.isExpired()) { + log.error(errorMessageIfExpired); + throw new ConnectException(errorMessageIfExpired); + } + } + + /** + * Complete the alter / reset offsets callback with a potential-success or a definite-success message. + * + * @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} / {@link SourceConnector#alterOffsets} + * @param isReset whether this callback if for an offsets reset operation + * @param cb the callback to complete + * + * @see KIP-875 + */ + private void completeModifyOffsetsCallback(boolean alterOffsetsResult, boolean isReset, Callback cb) { + String modificationType = isReset ? "reset" : "altered"; if (alterOffsetsResult) { - cb.onCompletion(null, new Message("The offsets for this connector have been altered successfully")); + cb.onCompletion(null, new Message("The offsets for this connector have been " + modificationType + " successfully")); } else { cb.onCompletion(null, new Message("The Connect framework-managed offsets for this connector have been " + - "altered successfully. However, if this connector manages offsets externally, they will need to be " + - "manually altered in the system that the connector uses.")); + modificationType + " successfully. However, if this connector manages offsets externally, they will need to be " + + "manually " + modificationType + " in the system that the connector uses.")); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index dac8e7b9bb7..5f497f1e118 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1522,60 +1522,59 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { - log.trace("Submitting alter offsets request for connector '{}'", connName); + protected void modifyConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { + log.trace("Submitting {} offsets request for connector '{}'", offsets == null ? "reset" : "alter", connName); addRequest(() -> { - if (!alterConnectorOffsetsChecks(connName, callback)) { + if (!modifyConnectorOffsetsChecks(connName, callback)) { return null; } - // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run + // At this point, we should be the leader (the call to modifyConnectorOffsetsChecks makes sure of that) and can safely run // a zombie fencing request if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { - log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once support enabled.", connName); + log.debug("Performing a round of zombie fencing before modifying offsets for source connector {} with exactly-once support enabled.", connName); doFenceZombieSourceTasks(connName, (error, ignored) -> { if (error != null) { - log.error("Failed to perform zombie fencing for source connector prior to altering offsets", error); - callback.onCompletion(new ConnectException("Failed to perform zombie fencing for source connector prior to altering offsets", - error), null); + log.error("Failed to perform zombie fencing for source connector prior to modifying offsets", error); + callback.onCompletion(new ConnectException("Failed to perform zombie fencing for source connector prior to modifying offsets", error), null); } else { - log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); - // We need to ensure that we perform the necessary checks again before proceeding to actually altering the connector offsets since + log.debug("Successfully completed zombie fencing for source connector {}; proceeding to modify offsets.", connName); + // We need to ensure that we perform the necessary checks again before proceeding to actually altering / resetting the connector offsets since // zombie fencing is done asynchronously and the conditions could have changed since the previous check addRequest(() -> { - if (alterConnectorOffsetsChecks(connName, callback)) { - worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + if (modifyConnectorOffsetsChecks(connName, callback)) { + worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); } return null; }, forwardErrorCallback(callback)); } }); } else { - worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); } return null; }, forwardErrorCallback(callback)); } /** - * This method performs a few checks for alter connector offsets request and completes the callback exceptionally - * if any check fails. - * @param connName the name of the connector whose offsets are to be altered + * This method performs a few checks for external requests to modify (alter or reset) connector offsets and + * completes the callback exceptionally if any check fails. + * @param connName the name of the connector whose offsets are to be modified * @param callback callback to invoke upon completion * @return true if all the checks passed, false otherwise */ - private boolean alterConnectorOffsetsChecks(String connName, Callback callback) { + private boolean modifyConnectorOffsetsChecks(String connName, Callback callback) { if (checkRebalanceNeeded(callback)) { return false; } if (!isLeader()) { - callback.onCompletion(new NotLeaderException("Only the leader can process alter offsets requests", leaderUrl()), null); + callback.onCompletion(new NotLeaderException("Only the leader can process external offsets modification requests", leaderUrl()), null); return false; } if (!refreshConfigSnapshot(workerSyncTimeoutMs)) { - throw new ConnectException("Failed to read to end of config topic before altering connector offsets"); + throw new ConnectException("Failed to read to end of config topic before modifying connector offsets"); } if (!configState.contains(connName)) { @@ -1586,10 +1585,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled - // naturally because requests to alter consumer group offsets will fail if there are still active members in the group. + // naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members + // in the group. if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { - callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " + - "can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); + callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified. This can be done " + + "for the specified connector by issuing a 'PUT' request to the '/connectors/" + connName + "/stop' endpoint"), null); return false; } return true; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java index 0b5bcd6588a..2e0b5457641 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.entities; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; import java.util.HashMap; import java.util.List; @@ -45,6 +46,9 @@ import java.util.Objects; * ] * } * + * + * @see ConnectorsResource#getOffsets + * @see ConnectorsResource#alterConnectorOffsets */ public class ConnectorOffsets { private final List offsets; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index b930ed1fbe7..646fb039256 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -367,6 +367,18 @@ public class ConnectorsResource implements ConnectResource { return Response.ok().entity(msg).build(); } + @DELETE + @Path("/{connector}/offsets") + @Operation(summary = "Reset the offsets for the specified connector") + public Response resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, + final @Context HttpHeaders headers, final @PathParam("connector") String connector) throws Throwable { + FutureCallback cb = new FutureCallback<>(); + herder.resetConnectorOffsets(connector, cb); + Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null, + new TypeReference() { }, new IdentityTranslator<>(), forward); + return Response.ok().entity(msg).build(); + } + // Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig // object. Throw BadRequestException on mismatch, otherwise put connectorName in config private void checkAndPutConnectorConfigName(String connectorName, Map connectorConfig) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 096a3f53d47..c7fee9e6715 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -374,19 +374,34 @@ public class StandaloneHerder extends AbstractHerder { } @Override - public synchronized void alterConnectorOffsets(String connName, Map, Map> offsets, Callback cb) { + protected synchronized void modifyConnectorOffsets(String connName, Map, Map> offsets, Callback cb) { + if (!modifyConnectorOffsetsChecks(connName, cb)) { + return; + } + + worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb); + } + + /** + * This method performs a few checks for external requests to modify (alter or reset) connector offsets and + * completes the callback exceptionally if any check fails. + * @param connName the name of the connector whose offsets are to be modified + * @param cb callback to invoke upon completion + * @return true if all the checks passed, false otherwise + */ + private boolean modifyConnectorOffsetsChecks(String connName, Callback cb) { if (!configState.contains(connName)) { cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); - return; + return false; } if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { - cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. " + - "This can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); - return; + cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified. This can be done " + + "for the specified connector by issuing a 'PUT' request to the '/connectors/" + connName + "/stop' endpoint"), null); + return false; } - worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb); + return true; } private void startConnector(String connName, Callback onStart) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java index 57703f07a3e..70bcf8c427e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java @@ -73,8 +73,8 @@ public final class SinkUtils { * and then parse them into a mapping from {@link TopicPartition}s to their corresponding {@link Long} * valued offsets. * - * @param partitionOffsets the partitions to offset map that needs to be validated and parsed. - * @return the parsed mapping from {@link TopicPartition} to its corresponding {@link Long} valued offset. + * @param partitionOffsets the partitions to offset map that needs to be validated and parsed; may not be null or empty + * @return the parsed mapping from {@link TopicPartition}s to their corresponding {@link Long} valued offsets; may not be null or empty * * @throws BadRequestException if the provided offsets aren't in the expected format */ diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index b52b55a7b2a..ad2a5f168ff 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -67,29 +67,33 @@ import static org.junit.Assert.assertTrue; */ @Category(IntegrationTest.class) public class OffsetsApiIntegrationTest { - - private static final String CONNECTOR_NAME = "test-connector"; - private static final String TOPIC = "test-topic"; - private static final Integer NUM_TASKS = 2; private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final int NUM_TASKS = 2; + private static final int NUM_RECORDS_PER_PARTITION = 10; private Map workerProps; + private EmbeddedConnectCluster.Builder connectBuilder; private EmbeddedConnectCluster connect; @Before public void setup() { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + // setup Connect worker properties workerProps = new HashMap<>(); workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); // build a Connect cluster backed by Kafka and Zk - connect = new EmbeddedConnectCluster.Builder() + connectBuilder = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) - .workerProps(workerProps) - .build(); - connect.start(); + .brokerProps(brokerProps) + .workerProps(workerProps); } @After @@ -99,6 +103,8 @@ public class OffsetsApiIntegrationTest { @Test public void testGetNonExistentConnectorOffsets() { + connect = connectBuilder.build(); + connect.start(); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.connectorOffsets("non-existent-connector")); assertEquals(404, e.errorCode()); @@ -106,11 +112,15 @@ public class OffsetsApiIntegrationTest { @Test public void testGetSinkConnectorOffsets() throws Exception { + connect = connectBuilder.build(); + connect.start(); getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); } @Test public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + connect = connectBuilder.build(); + connect.start(); Map connectorConfigs = baseSinkConnectorConfigs(); connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, "overridden-group-id"); @@ -126,6 +136,8 @@ public class OffsetsApiIntegrationTest { @Test public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + connect = connectBuilder.build(); + connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -144,9 +156,9 @@ public class OffsetsApiIntegrationTest { private void getAndVerifySinkConnectorOffsets(Map connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { kafkaCluster.createTopic(TOPIC, 5); - // Produce 10 messages to each partition + // Produce records to each partition for (int partition = 0; partition < 5; partition++) { - for (int message = 0; message < 10; message++) { + for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { kafkaCluster.produce(TOPIC, partition, "key", "value"); } } @@ -156,27 +168,31 @@ public class OffsetsApiIntegrationTest { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time."); - waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 5, 10, + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); - // Produce 10 more messages to each partition + // Produce more records to each partition for (int partition = 0; partition < 5; partition++) { - for (int message = 0; message < 10; message++) { + for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { kafkaCluster.produce(TOPIC, partition, "key", "value"); } } - waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 5, 20, + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, 2 * NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); } @Test public void testGetSourceConnectorOffsets() throws Exception { + connect = connectBuilder.build(); + connect.start(); getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } @Test public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { + connect = connectBuilder.build(); + connect.start(); Map connectorConfigs = baseSourceConnectorConfigs(); connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); getAndVerifySourceConnectorOffsets(connectorConfigs); @@ -184,6 +200,8 @@ public class OffsetsApiIntegrationTest { @Test public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + connect = connectBuilder.build(); + connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -205,19 +223,21 @@ public class OffsetsApiIntegrationTest { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time."); - waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 10, + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); - // Each task should produce 10 more records - connectorConfigs.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, "20"); + // Each task should produce more records + connectorConfigs.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(2 * NUM_RECORDS_PER_PARTITION)); connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 20, + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 2 * NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); } @Test public void testAlterOffsetsNonExistentConnector() throws Exception { + connect = connectBuilder.build(); + connect.start(); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.alterConnectorOffsets("non-existent-connector", new ConnectorOffsets(Collections.singletonList( new ConnectorOffset(Collections.emptyMap(), Collections.emptyMap()))))); @@ -226,6 +246,8 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterOffsetsNonStoppedConnector() throws Exception { + connect = connectBuilder.build(); + connect.start(); // Create source connector connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs()); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, @@ -260,11 +282,15 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterSinkConnectorOffsets() throws Exception { + connect = connectBuilder.build(); + connect.start(); alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); } @Test public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + connect = connectBuilder.build(); + connect.start(); Map connectorConfigs = baseSinkConnectorConfigs(); connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, "overridden-group-id"); @@ -279,6 +305,8 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + connect = connectBuilder.build(); + connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -296,12 +324,11 @@ public class OffsetsApiIntegrationTest { private void alterAndVerifySinkConnectorOffsets(Map connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { int numPartitions = 3; - int numMessages = 10; kafkaCluster.createTopic(TOPIC, numPartitions); - // Produce numMessages messages to each partition + // Produce records to each partition for (int partition = 0; partition < numPartitions; partition++) { - for (int message = 0; message < numMessages; message++) { + for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { kafkaCluster.produce(TOPIC, partition, "key", "value"); } } @@ -310,7 +337,7 @@ public class OffsetsApiIntegrationTest { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time."); - waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, numMessages, + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); connect.stopConnector(CONNECTOR_NAME); @@ -337,7 +364,7 @@ public class OffsetsApiIntegrationTest { assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " + "However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses.")); - waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions - 1, 5, + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 5, "Sink connector consumer group offsets should reflect the altered offsets"); // Update the connector's configs; this time expect SinkConnector::alterOffsets to return true @@ -356,7 +383,7 @@ public class OffsetsApiIntegrationTest { response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); assertThat(response, containsString("The offsets for this connector have been altered successfully")); - waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions - 1, 3, + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 3, "Sink connector consumer group offsets should reflect the altered offsets"); // Resume the connector and expect its offsets to catch up to the latest offsets @@ -366,16 +393,18 @@ public class OffsetsApiIntegrationTest { NUM_TASKS, "Connector tasks did not resume in time" ); - waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, 10, + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); } @Test public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { + connect = connectBuilder.build(); + connect.start(); connect.kafka().createTopic(TOPIC, 1); - // Produce 10 messages - for (int message = 0; message < 10; message++) { + // Produce records + for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { connect.kafka().produce(TOPIC, 0, "key", "value"); } @@ -404,6 +433,8 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterSinkConnectorOffsetsInvalidRequestBody() throws Exception { + connect = connectBuilder.build(); + connect.start(); // Create a sink connector and stop it connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorConfigs()); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, @@ -466,18 +497,24 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterSourceConnectorOffsets() throws Exception { - alterAndVerifySourceConnectorOffsets(connect, baseSourceConnectorConfigs()); + connect = connectBuilder.build(); + connect.start(); + alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } @Test public void testAlterSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { + connect = connectBuilder.build(); + connect.start(); Map connectorConfigs = baseSourceConnectorConfigs(); connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); - alterAndVerifySourceConnectorOffsets(connect, connectorConfigs); + alterAndVerifySourceConnectorOffsets(connectorConfigs); } @Test public void testAlterSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + connect = connectBuilder.build(); + connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -489,36 +526,26 @@ public class OffsetsApiIntegrationTest { connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.bootstrapServers()); - alterAndVerifySourceConnectorOffsets(connect, connectorConfigs); + alterAndVerifySourceConnectorOffsets(connectorConfigs); } } @Test public void testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception { - Properties brokerProps = new Properties(); - brokerProps.put("transaction.state.log.replication.factor", "1"); - brokerProps.put("transaction.state.log.min.isr", "1"); workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); - EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new EmbeddedConnectCluster.Builder() - .name("connect-cluster") - .brokerProps(brokerProps) - .numWorkers(NUM_WORKERS) - .workerProps(workerProps) - .build(); - exactlyOnceSupportEnabledConnectCluster.start(); + connect = connectBuilder.workerProps(workerProps).build(); + connect.start(); - try (AutoCloseable ignored = exactlyOnceSupportEnabledConnectCluster::stop) { - alterAndVerifySourceConnectorOffsets(exactlyOnceSupportEnabledConnectCluster, baseSourceConnectorConfigs()); - } + alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } - public void alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect, Map connectorConfigs) throws Exception { + public void alterAndVerifySourceConnectorOffsets(Map connectorConfigs) throws Exception { // Create source connector connect.configureConnector(CONNECTOR_NAME, connectorConfigs); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time."); - waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 10, + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); connect.stopConnector(CONNECTOR_NAME); @@ -540,7 +567,7 @@ public class OffsetsApiIntegrationTest { assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " + "However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses.")); - waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 5, + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 5, "Source connector offsets should reflect the altered offsets"); // Update the connector's configs; this time expect SourceConnector::alterOffsets to return true @@ -560,7 +587,7 @@ public class OffsetsApiIntegrationTest { response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); assertThat(response, containsString("The offsets for this connector have been altered successfully")); - waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 7, + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 7, "Source connector offsets should reflect the altered offsets"); // Resume the connector and expect its offsets to catch up to the latest offsets @@ -570,7 +597,277 @@ public class OffsetsApiIntegrationTest { NUM_TASKS, "Connector tasks did not resume in time" ); - waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 10, + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, + "Source connector offsets should reflect the expected number of records produced"); + } + + @Test + public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws Exception { + connect = connectBuilder.build(); + connect.start(); + // Create a source connector and stop it + connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs()); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME)); + + String content = "[]"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + + content = "{}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(400, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request")); + } + + content = "{\"key\": []}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Unrecognized field")); + } + + content = "{\"offsets\": []}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(400, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request")); + } + + content = "{\"offsets\": {}}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + + content = "{\"offsets\": [123]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot construct instance")); + } + + content = "{\"offsets\": [{\"key\": \"val\"}]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Unrecognized field")); + } + + content = "{\"offsets\": [{\"partition\": []]}]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + } + + @Test + public void testResetSinkConnectorOffsets() throws Exception { + connect = connectBuilder.build(); + connect.start(); + resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); + } + + @Test + public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + connect = connectBuilder.build(); + connect.start(); + Map connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + "overridden-group-id"); + resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); + // Ensure that the overridden consumer group ID was the one actually used + try (Admin admin = connect.kafka().createAdminClient()) { + Collection consumerGroups = admin.listConsumerGroups().all().get(); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + } + } + + @Test + public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + connect = connectBuilder.build(); + connect.start(); + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster); + } + } + + private void resetAndVerifySinkConnectorOffsets(Map connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { + int numPartitions = 3; + kafkaCluster.createTopic(TOPIC, numPartitions); + + // Produce records to each partition + for (int partition = 0; partition < numPartitions; partition++) { + for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { + kafkaCluster.produce(TOPIC, partition, "key", "value"); + } + } + // Create sink connector + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Reset the sink connector's offsets + String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + verifyEmptyConnectorOffsets(CONNECTOR_NAME); + + // Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error + response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + verifyEmptyConnectorOffsets(CONNECTOR_NAME); + + // Resume the connector and expect its offsets to catch up to the latest offsets + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector tasks did not resume in time" + ); + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + } + + @Test + public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { + connect = connectBuilder.build(); + connect.start(); + connect.kafka().createTopic(TOPIC, 1); + + // Produce records + for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { + connect.kafka().produce(TOPIC, 0, "key", "value"); + } + + // Configure a sink connector whose sink task blocks in its stop method + Map connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, TOPIC); + connectorConfigs.put("block", "Task::stop"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector tasks did not start in time."); + + verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, NUM_RECORDS_PER_PARTITION, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + + // Try to reset the offsets + ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); + assertThat(e.getMessage(), containsString("zombie sink task")); + } + + @Test + public void testResetSourceConnectorOffsets() throws Exception { + connect = connectBuilder.build(); + connect.start(); + resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); + } + + @Test + public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { + connect = connectBuilder.build(); + connect.start(); + Map connectorConfigs = baseSourceConnectorConfigs(); + connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); + resetAndVerifySourceConnectorOffsets(connectorConfigs); + } + + @Test + public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + connect = connectBuilder.build(); + connect.start(); + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map connectorConfigs = baseSourceConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySourceConnectorOffsets(connectorConfigs); + } + } + + @Test + public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception { + workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + connect = connectBuilder.workerProps(workerProps).build(); + connect.start(); + + resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); + } + + public void resetAndVerifySourceConnectorOffsets(Map connectorConfigs) throws Exception { + // Create source connector + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, + "Source connector offsets should reflect the expected number of records produced"); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Reset the source connector's offsets + String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + verifyEmptyConnectorOffsets(CONNECTOR_NAME); + + // Reset the source connector's offsets again while it is still in a STOPPED state and ensure that there is no error + response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + verifyEmptyConnectorOffsets(CONNECTOR_NAME); + + // Resume the connector and expect its offsets to catch up to the latest offsets + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector tasks did not resume in time" + ); + verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); } @@ -590,7 +887,7 @@ public class OffsetsApiIntegrationTest { props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); props.put(TOPIC_CONFIG, TOPIC); props.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3"); - props.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, "10"); + props.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(NUM_RECORDS_PER_PARTITION)); props.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, "1"); @@ -600,8 +897,8 @@ public class OffsetsApiIntegrationTest { /** * Verify whether the actual consumer group offsets for a sink connector match the expected offsets. The verification - * is done using the `GET /connectors/{connector}/offsets` REST API which is repeatedly queried until the offsets match - * or the {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes the following: + * is done using the GET /connectors/{connector}/offsets REST API which is repeatedly queried + * until the offsets match or the {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes the following: *

    *
  1. The sink connector is consuming from a single Kafka topic
  2. *
  3. The expected offset for each partition in the topic is the same
  4. @@ -615,8 +912,8 @@ public class OffsetsApiIntegrationTest { * 10 records) * @throws InterruptedException if the thread is interrupted while waiting for the actual offsets to match the expected offsets */ - private void waitForExpectedSinkConnectorOffsets(String connectorName, String expectedTopic, int expectedPartitions, - int expectedOffset, String conditionDetails) throws InterruptedException { + private void verifyExpectedSinkConnectorOffsets(String connectorName, String expectedTopic, int expectedPartitions, + int expectedOffset, String conditionDetails) throws InterruptedException { TestUtils.waitForCondition(() -> { ConnectorOffsets offsets = connect.connectorOffsets(connectorName); if (offsets.offsets().size() != expectedPartitions) { @@ -634,11 +931,10 @@ public class OffsetsApiIntegrationTest { /** * Verify whether the actual offsets for a source connector match the expected offsets. The verification is done using the - * `GET /connectors/{connector}/offsets` REST API which is repeatedly queried until the offsets match or the - * {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes that the source connector is a + * GET /connectors/{connector}/offsets REST API which is repeatedly queried until the offsets match + * or the {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes that the source connector is a * {@link MonitorableSourceConnector} * - * @param connect the Connect cluster that is running the source connector * @param connectorName the name of the source connector whose offsets are to be verified * @param numTasks the number of tasks for the source connector * @param expectedOffset the expected offset for each source partition @@ -646,8 +942,8 @@ public class OffsetsApiIntegrationTest { * 10 records) * @throws InterruptedException if the thread is interrupted while waiting for the actual offsets to match the expected offsets */ - private void waitForExpectedSourceConnectorOffsets(EmbeddedConnectCluster connect, String connectorName, int numTasks, - int expectedOffset, String conditionDetails) throws InterruptedException { + private void verifyExpectedSourceConnectorOffsets(String connectorName, int numTasks, + int expectedOffset, String conditionDetails) throws InterruptedException { TestUtils.waitForCondition(() -> { ConnectorOffsets offsets = connect.connectorOffsets(connectorName); // The MonitorableSourceConnector has a source partition per task @@ -663,4 +959,19 @@ public class OffsetsApiIntegrationTest { return true; }, OFFSET_READ_TIMEOUT_MS, conditionDetails); } + + /** + * Verify whether the GET /connectors/{connector}/offsets returns empty offsets for a source + * or sink connector whose offsets have been reset via the DELETE /connectors/{connector}/offsets + * REST API + * + * @param connectorName the name of the connector whose offsets are to be verified + * @throws InterruptedException if the thread is interrupted while waiting for the offsets to be empty + */ + private void verifyEmptyConnectorOffsets(String connectorName) throws InterruptedException { + TestUtils.waitForCondition(() -> { + ConnectorOffsets offsets = connect.connectorOffsets(connectorName); + return offsets.offsets().isEmpty(); + }, OFFSET_READ_TIMEOUT_MS, "Connector offsets should be empty after resetting offsets"); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index fa9d87c29e1..646ced752be 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions; +import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; import org.apache.kafka.clients.admin.FenceProducersResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; @@ -61,6 +63,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.Message; +import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -89,6 +92,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockedConstruction; import org.mockito.Mockito; @@ -134,6 +138,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GRO import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -144,6 +149,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anySet; @@ -1819,16 +1825,24 @@ public class WorkerTest { verifyKafkaClusterId(); } - @SuppressWarnings("unchecked") private void mockAdminListConsumerGroupOffsets(Admin admin, Map consumerGroupOffsets, Exception e) { + mockAdminListConsumerGroupOffsets(admin, consumerGroupOffsets, e, null, 0); + } + + private void mockAdminListConsumerGroupOffsets(Admin admin, Map consumerGroupOffsets, Exception e, Time time, long delayMs) { ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class); when(admin.listConsumerGroupOffsets(anyString(), any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result); - KafkaFuture> adminFuture = mock(KafkaFuture.class); - when(result.partitionsToOffsetAndMetadata()).thenReturn(adminFuture); - when(adminFuture.whenComplete(any())).thenAnswer(invocation -> { - ((KafkaFuture.BiConsumer, Throwable>) invocation.getArgument(0)) - .accept(consumerGroupOffsets, e); - return null; + KafkaFutureImpl> adminFuture = new KafkaFutureImpl<>(); + if (e != null) { + adminFuture.completeExceptionally(e); + } else { + adminFuture.complete(consumerGroupOffsets); + } + when(result.partitionsToOffsetAndMetadata()).thenAnswer(invocation -> { + if (time != null) { + time.sleep(delayMs); + } + return adminFuture; }); } @@ -1915,13 +1929,13 @@ public class WorkerTest { "support altering of offsets")); FutureCallback cb = new FutureCallback<>(); - worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps, + worker.modifyConnectorOffsets(CONNECTOR_ID, connectorProps, Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), cb); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); assertEquals(ConnectException.class, e.getCause().getClass()); - assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + " because it doesn't support external modification of offsets", + assertEquals("Failed to modify offsets for connector " + CONNECTOR_ID + " because it doesn't support external modification of offsets", e.getCause().getMessage()); verifyGenericIsolation(); @@ -1952,7 +1966,7 @@ public class WorkerTest { }); FutureCallback cb = new FutureCallback<>(); - worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, offsetWriter, Thread.currentThread().getContextClassLoader(), cb); assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); @@ -1988,7 +2002,7 @@ public class WorkerTest { }); FutureCallback cb = new FutureCallback<>(); - worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, offsetWriter, Thread.currentThread().getContextClassLoader(), cb); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message()); assertEquals(ConnectException.class, e.getCause().getClass()); @@ -2002,7 +2016,7 @@ public class WorkerTest { } @Test - public void testAlterOffsetsSinkConnectorNoResets() throws Exception { + public void testAlterOffsetsSinkConnectorNoDeletes() throws Exception { @SuppressWarnings("unchecked") ArgumentCaptor> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); Map, Map> partitionOffsets = new HashMap<>(); @@ -2024,7 +2038,7 @@ public class WorkerTest { } @Test - public void testAlterOffsetSinkConnectorOnlyResets() throws Exception { + public void testAlterOffsetSinkConnectorOnlyDeletes() throws Exception { @SuppressWarnings("unchecked") ArgumentCaptor> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); Map, Map> partitionOffsets = new HashMap<>(); @@ -2049,7 +2063,7 @@ public class WorkerTest { } @Test - public void testAlterOffsetsSinkConnectorAltersAndResets() throws Exception { + public void testAlterOffsetsSinkConnectorAltersAndDeletes() throws Exception { @SuppressWarnings("unchecked") ArgumentCaptor> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); @SuppressWarnings("unchecked") @@ -2109,7 +2123,7 @@ public class WorkerTest { } FutureCallback cb = new FutureCallback<>(); - worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, Thread.currentThread().getContextClassLoader(), cb); assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); @@ -2145,7 +2159,7 @@ public class WorkerTest { Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100")); FutureCallback cb = new FutureCallback<>(); - worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, Thread.currentThread().getContextClassLoader(), cb); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); @@ -2194,7 +2208,7 @@ public class WorkerTest { partitionOffsets.put(partition2, null); FutureCallback cb = new FutureCallback<>(); - worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, Thread.currentThread().getContextClassLoader(), cb); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); @@ -2229,7 +2243,7 @@ public class WorkerTest { partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100")); FutureCallback cb = new FutureCallback<>(); - worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, Thread.currentThread().getContextClassLoader(), cb); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); @@ -2240,6 +2254,193 @@ public class WorkerTest { verifyKafkaClusterId(); } + @Test + @SuppressWarnings("unchecked") + public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Exception { + Map workerProps = new HashMap<>(this.workerProps); + workerProps.put("exactly.once.source.support", "enabled"); + workerProps.put("bootstrap.servers", "localhost:9092"); + workerProps.put("group.id", "connect-cluster"); + workerProps.put("config.storage.topic", "connect-configs"); + workerProps.put("offset.storage.topic", "connect-offsets"); + workerProps.put("status.storage.topic", "connect-statuses"); + config = new DistributedConfig(workerProps); + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Set> connectorPartitions = new HashSet<>(); + connectorPartitions.add(Collections.singletonMap("partitionKey1", new Object())); + connectorPartitions.add(Collections.singletonMap("partitionKey2", new Object())); + when(offsetStore.connectorPartitions(eq(CONNECTOR_ID))).thenReturn(connectorPartitions); + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(null, null); + return null; + }); + + FutureCallback cb = new FutureCallback<>(); + worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, null, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been reset successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + InOrder inOrder = Mockito.inOrder(offsetStore, offsetWriter, producer); + inOrder.verify(offsetStore).start(); + connectorPartitions.forEach(partition -> inOrder.verify(offsetWriter).offset(partition, null)); + inOrder.verify(offsetWriter).beginFlush(); + inOrder.verify(producer).initTransactions(); + inOrder.verify(producer).beginTransaction(); + inOrder.verify(offsetWriter).doFlush(any()); + inOrder.verify(producer).commitTransaction(); + inOrder.verify(offsetStore, timeout(1000)).stop(); + verifyKafkaClusterId(); + } + + @Test + public void testResetOffsetsSinkConnector() throws Exception { + mockKafkaClusterId(); + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + + Admin admin = mock(Admin.class); + Time time = new MockTime(); + worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), + allConnectorClientConfigOverridePolicy, config -> admin); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + + TopicPartition tp = new TopicPartition("test-topic", 0); + mockAdminListConsumerGroupOffsets(admin, Collections.singletonMap(tp, new OffsetAndMetadata(10L)), null, time, 2000); + when(sinkConnector.alterOffsets(eq(connectorProps), eq(Collections.singletonMap(tp, null)))).thenAnswer(invocation -> { + time.sleep(3000); + return true; + }); + + DeleteConsumerGroupsResult deleteConsumerGroupsResult = mock(DeleteConsumerGroupsResult.class); + when(admin.deleteConsumerGroups(anyCollection(), any(DeleteConsumerGroupsOptions.class))).thenReturn(deleteConsumerGroupsResult); + when(deleteConsumerGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(null)); + + FutureCallback cb = new FutureCallback<>(); + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, null, + Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been reset successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + ArgumentCaptor deleteConsumerGroupsOptionsArgumentCaptor = ArgumentCaptor.forClass(DeleteConsumerGroupsOptions.class); + verify(admin).deleteConsumerGroups(anyCollection(), deleteConsumerGroupsOptionsArgumentCaptor.capture()); + // Expect the call to Admin::deleteConsumerGroups to have a timeout value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS + // minus the delay introduced in the call to Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call to + // SinkConnector::alterOffsets (3000 ms) + assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L - 3000L, + deleteConsumerGroupsOptionsArgumentCaptor.getValue().timeoutMs().intValue()); + verify(admin, timeout(1000)).close(); + verifyKafkaClusterId(); + } + + @Test + public void testResetOffsetsSinkConnectorDeleteConsumerGroupError() throws Exception { + mockKafkaClusterId(); + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + + Admin admin = mock(Admin.class); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), + allConnectorClientConfigOverridePolicy, config -> admin); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + + TopicPartition tp = new TopicPartition("test-topic", 0); + mockAdminListConsumerGroupOffsets(admin, Collections.singletonMap(tp, new OffsetAndMetadata(10L)), null); + when(sinkConnector.alterOffsets(eq(connectorProps), eq(Collections.singletonMap(tp, null)))).thenReturn(true); + + DeleteConsumerGroupsResult deleteConsumerGroupsResult = mock(DeleteConsumerGroupsResult.class); + when(admin.deleteConsumerGroups(anyCollection(), any(DeleteConsumerGroupsOptions.class))).thenReturn(deleteConsumerGroupsResult); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new ClusterAuthorizationException("Test exception")); + when(deleteConsumerGroupsResult.all()).thenReturn(future); + + FutureCallback cb = new FutureCallback<>(); + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, null, + Thread.currentThread().getContextClassLoader(), cb); + + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); + assertEquals(ConnectException.class, e.getCause().getClass()); + + verify(admin, timeout(1000)).close(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testModifySourceConnectorOffsetsTimeout() throws Exception { + mockKafkaClusterId(); + Time time = new MockTime(); + worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> { + time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); + return true; + }); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Map, Map> partitionOffsets = Collections.singletonMap( + Collections.singletonMap("partitionKey", "partitionValue"), + Collections.singletonMap("offsetKey", "offsetValue")); + + FutureCallback cb = new FutureCallback<>(); + worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message()); + assertEquals(ConnectException.class, e.getCause().getClass()); + assertThat(e.getCause().getMessage(), containsString("Timed out")); + + verify(offsetStore).start(); + verify(offsetStore, timeout(1000)).stop(); + verifyKafkaClusterId(); + } + + @Test + public void testModifyOffsetsSinkConnectorTimeout() throws Exception { + mockKafkaClusterId(); + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + + Admin admin = mock(Admin.class); + Time time = new MockTime(); + worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), + allConnectorClientConfigOverridePolicy, config -> admin); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + + when(sinkConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> { + time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); + return true; + }); + + FutureCallback cb = new FutureCallback<>(); + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, new HashMap<>(), + Thread.currentThread().getContextClassLoader(), cb); + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message()); + assertEquals(ConnectException.class, e.getCause().getClass()); + assertThat(e.getCause().getMessage(), containsString("Timed out")); + + verify(admin, timeout(1000)).close(); + verifyKafkaClusterId(); + } + private void assertStatusMetrics(long expected, String metricName) { MetricGroup statusMetrics = worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector()); if (expected == 0L) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index df1d45445ea..d603a4d5b6f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -122,6 +122,7 @@ import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CON import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.isNull; import static org.easymock.EasyMock.leq; import static org.easymock.EasyMock.newCapture; import static org.junit.Assert.assertEquals; @@ -4146,7 +4147,7 @@ public class DistributedHerderTest { } @Test - public void testAlterConnectorOffsetsUnknownConnector() throws Exception { + public void testModifyConnectorOffsetsUnknownConnector() throws Exception { // Get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); @@ -4155,7 +4156,7 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - // Now handle the alter connector offsets request + // Now handle the connector offsets modification request member.wakeup(); PowerMock.expectLastCall(); member.ensureActive(); @@ -4168,7 +4169,7 @@ public class DistributedHerderTest { herder.tick(); FutureCallback callback = new FutureCallback<>(); - herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); + herder.modifyConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); herder.tick(); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof NotFoundException); @@ -4177,7 +4178,7 @@ public class DistributedHerderTest { } @Test - public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { + public void testModifyOffsetsConnectorNotInStoppedState() throws Exception { // Get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); @@ -4186,7 +4187,7 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - // Now handle the alter connector offsets request + // Now handle the connector offsets modification request member.wakeup(); PowerMock.expectLastCall(); member.ensureActive(); @@ -4199,7 +4200,7 @@ public class DistributedHerderTest { herder.tick(); FutureCallback callback = new FutureCallback<>(); - herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.modifyConnectorOffsets(CONN1, null, callback); herder.tick(); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof BadRequestException); @@ -4208,7 +4209,7 @@ public class DistributedHerderTest { } @Test - public void testAlterOffsetsNotLeader() throws Exception { + public void testModifyOffsetsNotLeader() throws Exception { // Get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("member"); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); @@ -4217,7 +4218,7 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - // Now handle the alter connector offsets request + // Now handle the connector offsets modification request member.wakeup(); PowerMock.expectLastCall(); member.ensureActive(); @@ -4229,7 +4230,7 @@ public class DistributedHerderTest { herder.tick(); FutureCallback callback = new FutureCallback<>(); - herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.modifyConnectorOffsets(CONN1, new HashMap<>(), callback); herder.tick(); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof NotLeaderException); @@ -4238,7 +4239,7 @@ public class DistributedHerderTest { } @Test - public void testAlterOffsetsSinkConnector() throws Exception { + public void testModifyOffsetsSinkConnector() throws Exception { EasyMock.reset(herder); EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SINK).anyTimes(); PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); @@ -4253,7 +4254,9 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); // Now handle the alter connector offsets request - Map, Map> offsets = new HashMap<>(); + Map, Map> offsets = Collections.singletonMap( + Collections.singletonMap("partitionKey", "partitionValue"), + Collections.singletonMap("offsetKey", "offsetValue")); member.wakeup(); PowerMock.expectLastCall(); member.ensureActive(); @@ -4262,7 +4265,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); Capture> workerCallbackCapture = Capture.newInstance(); - worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); Message msg = new Message("The offsets for this connector have been altered successfully"); EasyMock.expectLastCall().andAnswer(() -> { workerCallbackCapture.getValue().onCompletion(null, msg); @@ -4282,7 +4285,7 @@ public class DistributedHerderTest { } @Test - public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { + public void testModifyOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { // Get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); @@ -4291,8 +4294,7 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - // Now handle the alter connector offsets request - Map, Map> offsets = new HashMap<>(); + // Now handle the reset connector offsets request member.wakeup(); PowerMock.expectLastCall(); member.ensureActive(); @@ -4301,8 +4303,8 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); Capture> workerCallbackCapture = Capture.newInstance(); - worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); - Message msg = new Message("The offsets for this connector have been altered successfully"); + worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), isNull(), capture(workerCallbackCapture)); + Message msg = new Message("The offsets for this connector have been reset successfully"); EasyMock.expectLastCall().andAnswer(() -> { workerCallbackCapture.getValue().onCompletion(null, msg); return null; @@ -4312,16 +4314,16 @@ public class DistributedHerderTest { herder.tick(); FutureCallback callback = new FutureCallback<>(); - herder.alterConnectorOffsets(CONN1, offsets, callback); + herder.resetConnectorOffsets(CONN1, callback); herder.tick(); assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); - assertEquals("The offsets for this connector have been altered successfully", msg.message()); + assertEquals("The offsets for this connector have been reset successfully", msg.message()); PowerMock.verifyAll(); } @Test - public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws Exception { + public void testModifyOffsetsSourceConnectorExactlyOnceEnabled() throws Exception { // Setup herder with exactly-once support for source connectors enabled herder = exactlyOnceHerder(); rebalanceListener = herder.new RebalanceListener(time); @@ -4337,7 +4339,9 @@ public class DistributedHerderTest { PowerMock.expectLastCall().anyTimes(); // Now handle the alter connector offsets request - Map, Map> offsets = new HashMap<>(); + Map, Map> offsets = Collections.singletonMap( + Collections.singletonMap("partitionKey", "partitionValue"), + Collections.singletonMap("offsetKey", "offsetValue")); member.wakeup(); PowerMock.expectLastCall().anyTimes(); member.ensureActive(); @@ -4354,7 +4358,7 @@ public class DistributedHerderTest { EasyMock.expect(workerFencingFuture.thenApply(EasyMock.>anyObject())).andReturn(herderFencingFuture); // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active - // fencing list. The other is the callback passed from DistributedHerder::alterConnectorOffsets in order to + // fencing list. The other is the callback passed from DistributedHerder::modifyConnectorOffsets in order to // queue up the actual alter offsets request if the zombie fencing succeeds. for (int i = 0; i < 2; i++) { Capture> herderFencingCallback = EasyMock.newCapture(); @@ -4366,7 +4370,7 @@ public class DistributedHerderTest { Capture> workerCallbackCapture = Capture.newInstance(); Message msg = new Message("The offsets for this connector have been altered successfully"); - worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); EasyMock.expectLastCall().andAnswer(() -> { workerCallbackCapture.getValue().onCompletion(null, msg); return null; @@ -4374,13 +4378,13 @@ public class DistributedHerderTest { // Handle the second alter connector offsets request. No zombie fencing request to the worker is expected now since we // already did a round of zombie fencing last time and no new tasks came up in the meanwhile. The config snapshot is - // refreshed once at the beginning of the DistributedHerder::alterConnectorOffsets method, once before checking + // refreshed once at the beginning of the DistributedHerder::modifyConnectorOffsets method, once before checking // whether zombie fencing is required, and once before actually proceeding to alter connector offsets. expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); Capture> workerCallbackCapture2 = Capture.newInstance(); - worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2)); + worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2)); EasyMock.expectLastCall().andAnswer(() -> { workerCallbackCapture2.getValue().onCompletion(null, msg); return null; @@ -4406,7 +4410,7 @@ public class DistributedHerderTest { } @Test - public void testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws Exception { + public void testModifyOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws Exception { // Setup herder with exactly-once support for source connectors enabled herder = exactlyOnceHerder(); rebalanceListener = herder.new RebalanceListener(time); @@ -4421,7 +4425,7 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall().anyTimes(); - // Now handle the alter connector offsets request + // Now handle the reset connector offsets request member.wakeup(); PowerMock.expectLastCall().anyTimes(); member.ensureActive(); @@ -4437,8 +4441,8 @@ public class DistributedHerderTest { EasyMock.expect(workerFencingFuture.thenApply(EasyMock.>anyObject())).andReturn(herderFencingFuture); // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active - // fencing list. The other is the callback passed from DistributedHerder::alterConnectorOffsets in order to - // queue up the actual alter offsets request if the zombie fencing succeeds. + // fencing list. The other is the callback passed from DistributedHerder::modifyConnectorOffsets in order to + // queue up the actual reset offsets request if the zombie fencing succeeds. for (int i = 0; i < 2; i++) { Capture> herderFencingCallback = EasyMock.newCapture(); EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(() -> { @@ -4451,14 +4455,14 @@ public class DistributedHerderTest { herder.tick(); FutureCallback callback = new FutureCallback<>(); - herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.resetConnectorOffsets(CONN1, callback); // Process the zombie fencing request herder.tick(); - // Process the alter offsets request + // Process the reset offsets request herder.tick(); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); assertEquals(ConnectException.class, e.getCause().getClass()); - assertEquals("Failed to perform zombie fencing for source connector prior to altering offsets", + assertEquals("Failed to perform zombie fencing for source connector prior to modifying offsets", e.getCause().getMessage()); PowerMock.verifyAll(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index e202f35bd9f..0183e251600 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -838,6 +838,38 @@ public class ConnectorsResourceTest { assertEquals(msg, response.getEntity()); } + @Test + public void testResetOffsetsNotLeader() throws Throwable { + final ArgumentCaptor> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture()); + + when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/offsets?forward=true"), eq("DELETE"), isNull(), isNull(), any())) + .thenReturn(new RestClient.HttpResponse<>(200, new HashMap<>(), new Message(""))); + connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME); + } + + @Test + public void testResetOffsetsConnectorNotFound() { + final ArgumentCaptor> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("Connector not found")) + .when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture()); + + assertThrows(NotFoundException.class, () -> connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME)); + } + + @Test + public void testResetOffsets() throws Throwable { + final ArgumentCaptor> cb = ArgumentCaptor.forClass(Callback.class); + Message msg = new Message("The offsets for this connector have been reset successfully"); + doAnswer(invocation -> { + cb.getValue().onCompletion(null, msg); + return null; + }).when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture()); + Response response = connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME); + assertEquals(200, response.getStatus()); + assertEquals(msg, response.getEntity()); + } + private byte[] serializeAsBytes(final T value) throws IOException { return new ObjectMapper().writeValueAsBytes(value); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index a32e813d0af..32c54f58035 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -90,6 +90,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.isNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; @@ -986,19 +987,26 @@ public class StandaloneHerderTest { } @Test - public void testAlterConnectorOffsetsUnknownConnector() { + public void testModifyConnectorOffsetsUnknownConnector() { PowerMock.replayAll(); FutureCallback alterOffsetsCallback = new FutureCallback<>(); - herder.alterConnectorOffsets("unknown-connector", new HashMap<>(), alterOffsetsCallback); + herder.alterConnectorOffsets("unknown-connector", + Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), + alterOffsetsCallback); ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof NotFoundException); + FutureCallback resetOffsetsCallback = new FutureCallback<>(); + herder.resetConnectorOffsets("unknown-connector", resetOffsetsCallback); + e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotFoundException); + PowerMock.verifyAll(); } @Test - public void testAlterConnectorOffsetsConnectorNotInStoppedState() { + public void testModifyConnectorOffsetsConnectorNotInStoppedState() { PowerMock.replayAll(); herder.configState = new ClusterConfigState( @@ -1013,10 +1021,19 @@ public class StandaloneHerderTest { Collections.emptySet(), Collections.emptySet() ); + FutureCallback alterOffsetsCallback = new FutureCallback<>(); - herder.alterConnectorOffsets(CONNECTOR_NAME, new HashMap<>(), alterOffsetsCallback); + herder.alterConnectorOffsets(CONNECTOR_NAME, + Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), + alterOffsetsCallback); ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof BadRequestException); + + FutureCallback resetOffsetsCallback = new FutureCallback<>(); + herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback); + e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof BadRequestException); + PowerMock.verifyAll(); } @@ -1024,7 +1041,7 @@ public class StandaloneHerderTest { public void testAlterConnectorOffsets() throws Exception { Capture> workerCallbackCapture = Capture.newInstance(); Message msg = new Message("The offsets for this connector have been altered successfully"); - worker.alterConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), anyObject(Map.class), capture(workerCallbackCapture)); + worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), anyObject(Map.class), capture(workerCallbackCapture)); EasyMock.expectLastCall().andAnswer(() -> { workerCallbackCapture.getValue().onCompletion(null, msg); return null; @@ -1044,11 +1061,42 @@ public class StandaloneHerderTest { Collections.emptySet() ); FutureCallback alterOffsetsCallback = new FutureCallback<>(); - herder.alterConnectorOffsets(CONNECTOR_NAME, new HashMap<>(), alterOffsetsCallback); + herder.alterConnectorOffsets(CONNECTOR_NAME, + Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), + alterOffsetsCallback); assertEquals(msg, alterOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); PowerMock.verifyAll(); } + @Test + public void testResetConnectorOffsets() throws Exception { + Capture> workerCallbackCapture = Capture.newInstance(); + Message msg = new Message("The offsets for this connector have been reset successfully"); + worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), isNull(), capture(workerCallbackCapture)); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }); + PowerMock.replayAll(); + + herder.configState = new ClusterConfigState( + 10, + null, + Collections.singletonMap(CONNECTOR_NAME, 0), + Collections.singletonMap(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE)), + Collections.singletonMap(CONNECTOR_NAME, TargetState.STOPPED), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + FutureCallback resetOffsetsCallback = new FutureCallback<>(); + herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback); + assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); + PowerMock.verifyAll(); + } + private void expectAdd(SourceSink sourceSink) { Map connectorProps = connectorConfig(sourceSink); ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ? diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index f3466c37b7e..36e0fc765a0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -647,7 +647,8 @@ public class EmbeddedConnectCluster { /** * Get the offsets for a connector via the GET /connectors/{connector}/offsets endpoint - * @param connectorName name of the connector + * + * @param connectorName name of the connector whose offsets are to be retrieved * @return the connector's offsets */ public ConnectorOffsets connectorOffsets(String connectorName) { @@ -668,7 +669,8 @@ public class EmbeddedConnectCluster { /** * Alter a connector's offsets via the PATCH /connectors/{connector}/offsets endpoint - * @param connectorName name of the connector + * + * @param connectorName name of the connector whose offsets are to be altered * @param offsets offsets to alter */ public String alterConnectorOffsets(String connectorName, ConnectorOffsets offsets) { @@ -686,7 +688,23 @@ public class EmbeddedConnectCluster { return responseToString(response); } else { throw new ConnectRestException(response.getStatus(), - "Could not execute PATCH request. Error response: " + responseToString(response)); + "Could not alter connector offsets. Error response: " + responseToString(response)); + } + } + + /** + * Reset a connector's offsets via the DELETE /connectors/{connector}/offsets endpoint + * + * @param connectorName name of the connector whose offsets are to be reset + */ + public String resetConnectorOffsets(String connectorName) { + String url = endpointForResource(String.format("connectors/%s/offsets", connectorName)); + Response response = requestDelete(url); + if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { + return responseToString(response); + } else { + throw new ConnectRestException(response.getStatus(), + "Could not reset connector offsets. Error response: " + responseToString(response)); } }