KAFKA-14784: Connect offset reset REST API (#13818)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Yash Mayya 2023-06-23 18:27:46 +01:00 committed by GitHub
parent 1dbcb7da9e
commit 6e72986949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1118 additions and 272 deletions

View File

@ -169,7 +169,7 @@
files="(RequestResponse|WorkerSinkTask)Test.java"/> files="(RequestResponse|WorkerSinkTask)Test.java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="DistributedHerderTest.java"/> files="(DistributedHerder|Worker)Test.java"/>
<!-- Raft --> <!-- Raft -->
<suppress checks="NPathComplexity" <suppress checks="NPathComplexity"

View File

@ -54,19 +54,26 @@ public abstract class SinkConnector extends Connector {
* User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected in the offsets * User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected in the offsets
* for this connector's consumer group. * for this connector's consumer group.
* <p> * <p>
* Note that altering / resetting offsets is expected to be an idempotent operation and this method should be able
* to handle being called more than once with the same arguments (which could occur if a user retries the request
* due to a failure in altering the consumer group offsets, for example).
* <p>
* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the * Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
* {@link #start(Map) start} method is invoked. * {@link #start(Map) start} method is invoked.
* *
* @param connectorConfig the configuration of the connector * @param connectorConfig the configuration of the connector
* @param offsets a map from topic partition to offset, containing the offsets that the user has requested to * @param offsets a map from topic partition to offset, containing the offsets that the user has requested to
* alter/reset. For any topic partitions whose offsets are being reset instead of altered, their * alter/reset. For any topic partitions whose offsets are being reset instead of altered, their
* corresponding value in the map will be {@code null}. * corresponding value in the map will be {@code null}. This map may be empty, but never null. An
* empty offsets map could indicate that the offsets were reset previously or that no offsets have
* been committed yet.
* @return whether this method has been overridden by the connector; the default implementation returns * @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true} * {@code true}
* @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
* @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
* reset for any other reason (for example, they have failed custom validation logic specific to this connector) * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
* @since 3.6
*/ */
public boolean alterOffsets(Map<String, String> connectorConfig, Map<TopicPartition, Long> offsets) { public boolean alterOffsets(Map<String, String> connectorConfig, Map<TopicPartition, Long> offsets) {
return false; return false;

View File

@ -85,19 +85,26 @@ public abstract class SourceConnector extends Connector {
* returned by any {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances} * returned by any {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances}
* provided to this connector and its tasks. * provided to this connector and its tasks.
* <p> * <p>
* Note that altering / resetting offsets is expected to be an idempotent operation and this method should be able
* to handle being called more than once with the same arguments (which could occur if a user retries the request
* due to a failure in writing the new offsets to the offsets store, for example).
* <p>
* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the * Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
* {@link #start(Map) start} method is invoked. * {@link #start(Map) start} method is invoked.
* *
* @param connectorConfig the configuration of the connector * @param connectorConfig the configuration of the connector
* @param offsets a map from source partition to source offset, containing the offsets that the user has requested * @param offsets a map from source partition to source offset, containing the offsets that the user has requested
* to alter/reset. For any source partitions whose offsets are being reset instead of altered, their * to alter/reset. For any source partitions whose offsets are being reset instead of altered, their
* corresponding source offset value in the map will be {@code null} * corresponding source offset value in the map will be {@code null}. This map may be empty, but
* never null. An empty offsets map could indicate that the offsets were reset previously or that no
* offsets have been committed yet.
* @return whether this method has been overridden by the connector; the default implementation returns * @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true} * {@code true}
* @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
* @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
* reset for any other reason (for example, they have failed custom validation logic specific to this connector) * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
* @since 3.6
*/ */
public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
return false; return false;

View File

@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceConnector;
@ -902,4 +903,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
cb.onCompletion(t, null); cb.onCompletion(t, null);
} }
} }
@Override
public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
if (offsets == null || offsets.isEmpty()) {
callback.onCompletion(new ConnectException("The offsets to be altered may not be null or empty"), null);
return;
}
modifyConnectorOffsets(connName, offsets, callback);
}
@Override
public void resetConnectorOffsets(String connName, Callback<Message> callback) {
modifyConnectorOffsets(connName, null, callback);
}
/**
* Service external requests to alter or reset connector offsets.
* @param connName the name of the connector whose offsets are to be modified
* @param offsets the offsets to be written; this should be {@code null} for offsets reset requests
* @param cb callback to invoke upon completion
*/
protected abstract void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
} }

View File

@ -311,6 +311,13 @@ public interface Herder {
*/ */
void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb); void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
/**
* Reset a connector's offsets.
* @param connName the name of the connector whose offsets are to be reset
* @param cb callback to invoke upon completion
*/
void resetConnectorOffsets(String connName, Callback<Message> cb);
enum ConfigReloadAction { enum ConfigReloadAction {
NONE, NONE,
RESTART RESTART

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
@ -38,10 +39,13 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException; import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
@ -92,6 +96,7 @@ import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -1201,6 +1206,7 @@ public class Worker {
}); });
} catch (Throwable t) { } catch (Throwable t) {
Utils.closeQuietly(admin, "Offset fetch admin for sink connector " + connName); Utils.closeQuietly(admin, "Offset fetch admin for sink connector " + connName);
log.error("Failed to retrieve consumer group offsets for sink connector {}", connName, t);
cb.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, t), null); cb.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, t), null);
} }
} }
@ -1236,7 +1242,8 @@ public class Worker {
.collect(Collectors.toList()); .collect(Collectors.toList());
cb.onCompletion(null, new ConnectorOffsets(connectorOffsets)); cb.onCompletion(null, new ConnectorOffsets(connectorOffsets));
} catch (Throwable t) { } catch (Throwable t) {
cb.onCompletion(t, null); log.error("Failed to retrieve offsets for source connector {}", connName, t);
cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to retrieve offsets for source connector " + connName), null);
} finally { } finally {
Utils.closeQuietly(offsetReader, "Offset reader for connector " + connName); Utils.closeQuietly(offsetReader, "Offset reader for connector " + connName);
Utils.closeQuietly(offsetStore::stop, "Offset store for connector " + connName); Utils.closeQuietly(offsetStore::stop, "Offset store for connector " + connName);
@ -1245,21 +1252,17 @@ public class Worker {
} }
/** /**
* Alter a connector's offsets. * Modify (alter / reset) a connector's offsets.
* *
* @param connName the name of the connector whose offsets are to be altered * @param connName the name of the connector whose offsets are to be modified
* @param connectorConfig the connector's configurations * @param connectorConfig the connector's configurations
* @param offsets a mapping from partitions (either source partitions for source connectors, or Kafka topic * @param offsets a mapping from partitions (either source partitions for source connectors, or Kafka topic
* partitions for sink connectors) to offsets that need to be written; may not be null or empty * partitions for sink connectors) to offsets that need to be written; this should be {@code null}
* for offsets reset requests
* @param cb callback to invoke upon completion * @param cb callback to invoke upon completion
*/ */
public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig, public void modifyConnectorOffsets(String connName, Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
if (offsets == null || offsets.isEmpty()) {
throw new ConnectException("The offsets to be altered may not be null or empty");
}
String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
Connector connector; Connector connector;
@ -1267,40 +1270,34 @@ public class Worker {
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
connector = plugins.newConnector(connectorClassOrAlias); connector = plugins.newConnector(connectorClassOrAlias);
if (ConnectUtils.isSinkConnector(connector)) { if (ConnectUtils.isSinkConnector(connector)) {
log.debug("Altering consumer group offsets for sink connector: {}", connName); log.debug("Modifying offsets for sink connector: {}", connName);
alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); modifySinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
} else { } else {
log.debug("Altering offsets for source connector: {}", connName); log.debug("Modifying offsets for source connector: {}", connName);
alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); modifySourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
} }
} }
} }
/** /**
* Alter a sink connector's consumer group offsets. * Modify (alter / reset) a sink connector's consumer group offsets.
* <p> * <p>
* Visible for testing. * Visible for testing.
* *
* @param connName the name of the sink connector whose offsets are to be altered * @param connName the name of the sink connector whose offsets are to be modified
* @param connector an instance of the sink connector * @param connector an instance of the sink connector
* @param connectorConfig the sink connector's configuration * @param connectorConfig the sink connector's configuration
* @param offsets a mapping from topic partitions to offsets that need to be written; may not be null or empty * @param offsets a mapping from topic partitions to offsets that need to be written; this should be {@code null}
* for offsets reset requests
* @param connectorLoader the connector plugin's classloader to be used as the thread context classloader * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
* @param cb callback to invoke upon completion * @param cb callback to invoke upon completion
*/ */
void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, void modifySinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> { executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try { try {
Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(offsets); Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
boolean alterOffsetsResult; boolean isReset = offsets == null;
try {
alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets);
} catch (UnsupportedOperationException e) {
throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
"modification of offsets", e);
}
SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
Class<? extends Connector> sinkConnectorClass = connector.getClass(); Class<? extends Connector> sinkConnectorClass = connector.getClass();
Map<String, Object> adminConfig = adminConfigs( Map<String, Object> adminConfig = adminConfigs(
@ -1320,89 +1317,192 @@ public class Worker {
Admin admin = adminFactory.apply(adminConfig); Admin admin = adminFactory.apply(adminConfig);
try { try {
List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); Map<TopicPartition, Long> offsetsToWrite;
if (isReset) {
offsetsToWrite = new HashMap<>();
ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
.timeoutMs((int) timer.remainingMs());
try {
admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions)
.partitionsToOffsetAndMetadata()
.get(timer.remainingMs(), TimeUnit.MILLISECONDS)
.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() timer.update();
.stream() log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}",
.filter(entry -> entry.getValue() != null) connName, groupId, offsetsToWrite.keySet());
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); } catch (Exception e) {
Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName);
if (!offsetsToAlter.isEmpty()) { log.error("Failed to list offsets prior to resetting offsets for sink connector {}", connName, e);
log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting offsets for sink connector " + connName, e), null);
connName, offsetsToAlter); return;
AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs(
(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
alterConsumerGroupOffsetsOptions);
adminFutures.add(alterConsumerGroupOffsetsResult.all());
}
Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
.stream()
.filter(entry -> entry.getValue() == null)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
if (!partitionsToReset.isEmpty()) {
log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
connName, partitionsToReset);
DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs(
(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
deleteConsumerGroupOffsetsOptions);
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
}
@SuppressWarnings("rawtypes")
KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
compositeAdminFuture.whenComplete((ignored, error) -> {
if (error != null) {
// When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client
// will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
// completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to
// delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException
if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) {
cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
"haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
"completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
"Connect cluster may need to be restarted to get rid of the zombie sink tasks."),
null);
} else {
cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null);
}
} else {
completeAlterOffsetsCallback(alterOffsetsResult, cb);
} }
}).whenComplete((ignored, ignoredError) -> { } else {
// errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets);
// an exception itself, and we can thus ignore the error here }
Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
}); boolean alterOffsetsResult;
try {
alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite);
} catch (UnsupportedOperationException e) {
log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets",
connName, e);
throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " +
"modification of offsets", e);
}
updateTimerAndCheckExpiry(timer, "Timed out while calling the 'alterOffsets' method for sink connector " + connName);
if (isReset) {
resetSinkConnectorOffsets(connName, groupId, admin, cb, alterOffsetsResult, timer);
} else {
alterSinkConnectorOffsets(connName, groupId, admin, offsetsToWrite, cb, alterOffsetsResult, timer);
}
} catch (Throwable t) { } catch (Throwable t) {
Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); Utils.closeQuietly(admin, "Offset modification admin for sink connector " + connName);
throw t; throw t;
} }
} catch (Throwable t) { } catch (Throwable t) {
cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); log.error("Failed to modify offsets for sink connector {}", connName, t);
cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for sink connector " + connName), null);
} }
})); }));
} }
/** /**
* Alter a source connector's offsets. * Alter a sink connector's consumer group offsets. This is done via calls to {@link Admin#alterConsumerGroupOffsets}
* and / or {@link Admin#deleteConsumerGroupOffsets}.
* *
* @param connName the name of the source connector whose offsets are to be altered * @param connName the name of the sink connector whose offsets are to be altered
* @param groupId the sink connector's consumer group ID
* @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; will be closed after use
* @param offsetsToWrite a mapping from topic partitions to offsets that need to be written; may not be null or empty
* @param cb callback to invoke upon completion
* @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} for the connector
* @param timer {@link Timer} to bound the total runtime of admin client requests
*/
private void alterSinkConnectorOffsets(String connName, String groupId, Admin admin, Map<TopicPartition, Long> offsetsToWrite,
Callback<Message> cb, boolean alterOffsetsResult, Timer timer) {
List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = offsetsToWrite.entrySet()
.stream()
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
if (!offsetsToAlter.isEmpty()) {
log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.",
connName, offsetsToAlter);
AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions()
.timeoutMs((int) timer.remainingMs());
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
alterConsumerGroupOffsetsOptions);
adminFutures.add(alterConsumerGroupOffsetsResult.all());
}
Set<TopicPartition> partitionsToReset = offsetsToWrite.entrySet()
.stream()
.filter(entry -> entry.getValue() == null)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
if (!partitionsToReset.isEmpty()) {
log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.",
connName, partitionsToReset);
DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions()
.timeoutMs((int) timer.remainingMs());
DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
deleteConsumerGroupOffsetsOptions);
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
}
@SuppressWarnings("rawtypes")
KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
compositeAdminFuture.whenComplete((ignored, error) -> {
if (error != null) {
// When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client
// will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped
// completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to
// delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException
if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) {
String errorMsg = "Failed to alter consumer group offsets for connector " + connName + " either because its tasks " +
"haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " +
"completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
"Connect cluster may need to be restarted to get rid of the zombie sink tasks.";
log.error(errorMsg, error);
cb.onCompletion(new ConnectException(errorMsg, error), null);
} else {
log.error("Failed to alter consumer group offsets for connector {}", connName, error);
cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null);
}
} else {
completeModifyOffsetsCallback(alterOffsetsResult, false, cb);
}
}).whenComplete((ignored, ignoredError) -> {
// errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw
// an exception itself, and we can thus ignore the error here
Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName);
});
}
/**
* Reset a sink connector's consumer group offsets. This is done by deleting the consumer group via a call to
* {@link Admin#deleteConsumerGroups}
*
* @param connName the name of the sink connector whose offsets are to be reset
* @param groupId the sink connector's consumer group ID
* @param admin the {@link Admin admin client} to be used for resetting the consumer group offsets; will be closed after use
* @param cb callback to invoke upon completion
* @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} for the connector
* @param timer {@link Timer} to bound the total runtime of admin client requests
*/
private void resetSinkConnectorOffsets(String connName, String groupId, Admin admin, Callback<Message> cb, boolean alterOffsetsResult, Timer timer) {
DeleteConsumerGroupsOptions deleteConsumerGroupsOptions = new DeleteConsumerGroupsOptions().timeoutMs((int) timer.remainingMs());
admin.deleteConsumerGroups(Collections.singleton(groupId), deleteConsumerGroupsOptions)
.all()
.whenComplete((ignored, error) -> {
// We treat GroupIdNotFoundException as a non-error here because resetting a connector's offsets is expected to be an idempotent operation
// and the consumer group could have already been deleted in a prior offsets reset request
if (error != null && !(error instanceof GroupIdNotFoundException)) {
// When a consumer group is non-empty, attempts to delete it via the admin client result in a GroupNotEmptyException. This can occur
// if the sink tasks haven't stopped completely or if the connector is resumed while the reset offsets request is being processed
if (error instanceof GroupNotEmptyException) {
String errorMsg = "Failed to reset consumer group offsets for connector " + connName + " either because its tasks " +
"haven't stopped completely yet or the connector was resumed before the request to reset its offsets could be successfully " +
"completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " +
"Connect cluster may need to be restarted to get rid of the zombie sink tasks.";
log.error(errorMsg, error);
cb.onCompletion(new ConnectException(errorMsg, error), null);
} else {
log.error("Failed to reset consumer group offsets for sink connector {}", connName, error);
cb.onCompletion(new ConnectException("Failed to reset consumer group offsets for sink connector " + connName, error), null);
}
} else {
completeModifyOffsetsCallback(alterOffsetsResult, true, cb);
}
}).whenComplete((ignored, ignoredError) -> {
// errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw
// an exception itself, and we can thus ignore the error here
Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName);
});
}
/**
* Modify (alter / reset) a source connector's offsets.
*
* @param connName the name of the source connector whose offsets are to be modified
* @param connector an instance of the source connector * @param connector an instance of the source connector
* @param connectorConfig the source connector's configuration * @param connectorConfig the source connector's configuration
* @param offsets a mapping from partitions to offsets that need to be written; may not be null or empty * @param offsets a mapping from partitions to offsets that need to be written; this should be {@code null} for
* offsets reset requests
* @param connectorLoader the connector plugin's classloader to be used as the thread context classloader * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader
* @param cb callback to invoke upon completion * @param cb callback to invoke upon completion
*/ */
private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, private void modifySourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable()); SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
Map<String, Object> producerProps = config.exactlyOnceSourceEnabled() Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig, ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig,
@ -1417,29 +1517,60 @@ public class Worker {
offsetStore.configure(config); offsetStore.configure(config);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter);
alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb); modifySourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
} }
// Visible for testing // Visible for testing
void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, void modifySourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore, Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore,
KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter, KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter,
ClassLoader connectorLoader, Callback<Message> cb) { ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> { executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try { try {
boolean alterOffsetsResult; Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
try {
alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets);
} catch (UnsupportedOperationException e) {
throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " +
"modification of offsets", e);
}
// This reads to the end of the offsets topic and can be a potentially time-consuming operation // This reads to the end of the offsets topic and can be a potentially time-consuming operation
offsetStore.start(); offsetStore.start();
updateTimerAndCheckExpiry(timer, "Timed out while trying to read to the end of the offsets topic prior to modifying " +
"offsets for source connector " + connName);
Map<Map<String, ?>, Map<String, ?>> offsetsToWrite;
// The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's // If the offsets argument is null, it indicates an offsets reset operation - i.e. a null offset should
// be written for every source partition of the connector
boolean isReset;
if (offsets == null) {
isReset = true;
offsetsToWrite = new HashMap<>();
offsetStore.connectorPartitions(connName).forEach(partition -> offsetsToWrite.put(partition, null));
log.debug("Found the following partitions (to reset offsets) for source connector {}: {}", connName, offsetsToWrite.keySet());
} else {
isReset = false;
offsetsToWrite = offsets;
}
boolean alterOffsetsResult;
try {
alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsetsToWrite);
} catch (UnsupportedOperationException e) {
log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets",
connName, e);
throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " +
"modification of offsets", e);
}
updateTimerAndCheckExpiry(timer, "Timed out while calling the 'alterOffsets' method for source connector " + connName);
// This should only occur for an offsets reset request when there are no source partitions found for the source connector in the
// offset store - either because there was a prior attempt to reset offsets or if there are no offsets committed by this source
// connector so far
if (offsetsToWrite.isEmpty()) {
log.info("No offsets found for source connector {} - this can occur due to a prior attempt to reset offsets or if the " +
"source connector hasn't committed any offsets yet", connName);
completeModifyOffsetsCallback(alterOffsetsResult, isReset, cb);
return;
}
// The modifySourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's
// safe to write offsets via an offset writer // safe to write offsets via an offset writer
offsets.forEach(offsetWriter::offset); offsetsToWrite.forEach(offsetWriter::offset);
// We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent
// offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed, // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed,
@ -1450,7 +1581,7 @@ public class Worker {
producer.initTransactions(); producer.initTransactions();
producer.beginTransaction(); producer.beginTransaction();
} }
log.debug("Committing the following offsets for source connector {}: {}", connName, offsets); log.debug("Committing the following offsets for source connector {}: {}", connName, offsetsToWrite);
FutureCallback<Void> offsetWriterCallback = new FutureCallback<>(); FutureCallback<Void> offsetWriterCallback = new FutureCallback<>();
offsetWriter.doFlush(offsetWriterCallback); offsetWriter.doFlush(offsetWriterCallback);
if (config.exactlyOnceSourceEnabled()) { if (config.exactlyOnceSourceEnabled()) {
@ -1458,32 +1589,58 @@ public class Worker {
} }
try { try {
offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); offsetWriterCallback.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new ConnectException("Failed to alter offsets for source connector " + connName, e.getCause()); throw new ConnectException("Failed to modify offsets for source connector " + connName, e.getCause());
} catch (TimeoutException e) { } catch (TimeoutException e) {
throw new ConnectException("Timed out while attempting to alter offsets for source connector " + connName, e); throw new ConnectException("Timed out while attempting to modify offsets for source connector " + connName, e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new ConnectException("Unexpectedly interrupted while attempting to alter offsets for source connector " + connName, e); throw new ConnectException("Unexpectedly interrupted while attempting to modify offsets for source connector " + connName, e);
} }
completeAlterOffsetsCallback(alterOffsetsResult, cb); completeModifyOffsetsCallback(alterOffsetsResult, isReset, cb);
} catch (Throwable t) { } catch (Throwable t) {
log.error("Failed to alter offsets for source connector {}", connName, t); log.error("Failed to modify offsets for source connector {}", connName, t);
cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for source connector " + connName), null); cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for source connector " + connName), null);
} finally { } finally {
Utils.closeQuietly(offsetStore::stop, "Offset store for offset alter request for connector " + connName); Utils.closeQuietly(offsetStore::stop, "Offset store for offset modification request for connector " + connName);
} }
})); }));
} }
private void completeAlterOffsetsCallback(boolean alterOffsetsResult, Callback<Message> cb) { /**
* Update the provided timer, check if it's expired and throw a {@link ConnectException} with the provided error
* message if it is.
*
* @param timer {@link Timer} to check
* @param errorMessageIfExpired error message indicating the cause for the timer expiry
* @throws ConnectException if the timer has expired
*/
private void updateTimerAndCheckExpiry(Timer timer, String errorMessageIfExpired) {
timer.update();
if (timer.isExpired()) {
log.error(errorMessageIfExpired);
throw new ConnectException(errorMessageIfExpired);
}
}
/**
* Complete the alter / reset offsets callback with a potential-success or a definite-success message.
*
* @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} / {@link SourceConnector#alterOffsets}
* @param isReset whether this callback if for an offsets reset operation
* @param cb the callback to complete
*
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect">KIP-875</a>
*/
private void completeModifyOffsetsCallback(boolean alterOffsetsResult, boolean isReset, Callback<Message> cb) {
String modificationType = isReset ? "reset" : "altered";
if (alterOffsetsResult) { if (alterOffsetsResult) {
cb.onCompletion(null, new Message("The offsets for this connector have been altered successfully")); cb.onCompletion(null, new Message("The offsets for this connector have been " + modificationType + " successfully"));
} else { } else {
cb.onCompletion(null, new Message("The Connect framework-managed offsets for this connector have been " + cb.onCompletion(null, new Message("The Connect framework-managed offsets for this connector have been " +
"altered successfully. However, if this connector manages offsets externally, they will need to be " + modificationType + " successfully. However, if this connector manages offsets externally, they will need to be " +
"manually altered in the system that the connector uses.")); "manually " + modificationType + " in the system that the connector uses."));
} }
} }

View File

@ -1522,60 +1522,59 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
} }
@Override @Override
public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) { protected void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
log.trace("Submitting alter offsets request for connector '{}'", connName); log.trace("Submitting {} offsets request for connector '{}'", offsets == null ? "reset" : "alter", connName);
addRequest(() -> { addRequest(() -> {
if (!alterConnectorOffsetsChecks(connName, callback)) { if (!modifyConnectorOffsetsChecks(connName, callback)) {
return null; return null;
} }
// At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run // At this point, we should be the leader (the call to modifyConnectorOffsetsChecks makes sure of that) and can safely run
// a zombie fencing request // a zombie fencing request
if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once support enabled.", connName); log.debug("Performing a round of zombie fencing before modifying offsets for source connector {} with exactly-once support enabled.", connName);
doFenceZombieSourceTasks(connName, (error, ignored) -> { doFenceZombieSourceTasks(connName, (error, ignored) -> {
if (error != null) { if (error != null) {
log.error("Failed to perform zombie fencing for source connector prior to altering offsets", error); log.error("Failed to perform zombie fencing for source connector prior to modifying offsets", error);
callback.onCompletion(new ConnectException("Failed to perform zombie fencing for source connector prior to altering offsets", callback.onCompletion(new ConnectException("Failed to perform zombie fencing for source connector prior to modifying offsets", error), null);
error), null);
} else { } else {
log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); log.debug("Successfully completed zombie fencing for source connector {}; proceeding to modify offsets.", connName);
// We need to ensure that we perform the necessary checks again before proceeding to actually altering the connector offsets since // We need to ensure that we perform the necessary checks again before proceeding to actually altering / resetting the connector offsets since
// zombie fencing is done asynchronously and the conditions could have changed since the previous check // zombie fencing is done asynchronously and the conditions could have changed since the previous check
addRequest(() -> { addRequest(() -> {
if (alterConnectorOffsetsChecks(connName, callback)) { if (modifyConnectorOffsetsChecks(connName, callback)) {
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
} }
return null; return null;
}, forwardErrorCallback(callback)); }, forwardErrorCallback(callback));
} }
}); });
} else { } else {
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
} }
return null; return null;
}, forwardErrorCallback(callback)); }, forwardErrorCallback(callback));
} }
/** /**
* This method performs a few checks for alter connector offsets request and completes the callback exceptionally * This method performs a few checks for external requests to modify (alter or reset) connector offsets and
* if any check fails. * completes the callback exceptionally if any check fails.
* @param connName the name of the connector whose offsets are to be altered * @param connName the name of the connector whose offsets are to be modified
* @param callback callback to invoke upon completion * @param callback callback to invoke upon completion
* @return true if all the checks passed, false otherwise * @return true if all the checks passed, false otherwise
*/ */
private boolean alterConnectorOffsetsChecks(String connName, Callback<Message> callback) { private boolean modifyConnectorOffsetsChecks(String connName, Callback<Message> callback) {
if (checkRebalanceNeeded(callback)) { if (checkRebalanceNeeded(callback)) {
return false; return false;
} }
if (!isLeader()) { if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can process alter offsets requests", leaderUrl()), null); callback.onCompletion(new NotLeaderException("Only the leader can process external offsets modification requests", leaderUrl()), null);
return false; return false;
} }
if (!refreshConfigSnapshot(workerSyncTimeoutMs)) { if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
throw new ConnectException("Failed to read to end of config topic before altering connector offsets"); throw new ConnectException("Failed to read to end of config topic before modifying connector offsets");
} }
if (!configState.contains(connName)) { if (!configState.contains(connName)) {
@ -1586,10 +1585,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above),
// we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop).
// Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled
// naturally because requests to alter consumer group offsets will fail if there are still active members in the group. // naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members
// in the group.
if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) {
callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " + callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified. This can be done " +
"can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); "for the specified connector by issuing a 'PUT' request to the '/connectors/" + connName + "/stop' endpoint"), null);
return false; return false;
} }
return true; return true;

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -45,6 +46,9 @@ import java.util.Objects;
* ] * ]
* } * }
* </pre> * </pre>
*
* @see ConnectorsResource#getOffsets
* @see ConnectorsResource#alterConnectorOffsets
*/ */
public class ConnectorOffsets { public class ConnectorOffsets {
private final List<ConnectorOffset> offsets; private final List<ConnectorOffset> offsets;

View File

@ -367,6 +367,18 @@ public class ConnectorsResource implements ConnectResource {
return Response.ok().entity(msg).build(); return Response.ok().entity(msg).build();
} }
@DELETE
@Path("/{connector}/offsets")
@Operation(summary = "Reset the offsets for the specified connector")
public Response resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers, final @PathParam("connector") String connector) throws Throwable {
FutureCallback<Message> cb = new FutureCallback<>();
herder.resetConnectorOffsets(connector, cb);
Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null,
new TypeReference<Message>() { }, new IdentityTranslator<>(), forward);
return Response.ok().entity(msg).build();
}
// Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig // Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig
// object. Throw BadRequestException on mismatch, otherwise put connectorName in config // object. Throw BadRequestException on mismatch, otherwise put connectorName in config
private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) { private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) {

View File

@ -374,19 +374,34 @@ public class StandaloneHerder extends AbstractHerder {
} }
@Override @Override
public synchronized void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { protected synchronized void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
if (!modifyConnectorOffsetsChecks(connName, cb)) {
return;
}
worker.modifyConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb);
}
/**
* This method performs a few checks for external requests to modify (alter or reset) connector offsets and
* completes the callback exceptionally if any check fails.
* @param connName the name of the connector whose offsets are to be modified
* @param cb callback to invoke upon completion
* @return true if all the checks passed, false otherwise
*/
private boolean modifyConnectorOffsetsChecks(String connName, Callback<Message> cb) {
if (!configState.contains(connName)) { if (!configState.contains(connName)) {
cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
return; return false;
} }
if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) {
cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. " + cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified. This can be done " +
"This can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); "for the specified connector by issuing a 'PUT' request to the '/connectors/" + connName + "/stop' endpoint"), null);
return; return false;
} }
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb); return true;
} }
private void startConnector(String connName, Callback<TargetState> onStart) { private void startConnector(String connName, Callback<TargetState> onStart) {

View File

@ -73,8 +73,8 @@ public final class SinkUtils {
* and then parse them into a mapping from {@link TopicPartition}s to their corresponding {@link Long} * and then parse them into a mapping from {@link TopicPartition}s to their corresponding {@link Long}
* valued offsets. * valued offsets.
* *
* @param partitionOffsets the partitions to offset map that needs to be validated and parsed. * @param partitionOffsets the partitions to offset map that needs to be validated and parsed; may not be null or empty
* @return the parsed mapping from {@link TopicPartition} to its corresponding {@link Long} valued offset. * @return the parsed mapping from {@link TopicPartition}s to their corresponding {@link Long} valued offsets; may not be null or empty
* *
* @throws BadRequestException if the provided offsets aren't in the expected format * @throws BadRequestException if the provided offsets aren't in the expected format
*/ */

View File

@ -67,29 +67,33 @@ import static org.junit.Assert.assertTrue;
*/ */
@Category(IntegrationTest.class) @Category(IntegrationTest.class)
public class OffsetsApiIntegrationTest { public class OffsetsApiIntegrationTest {
private static final String CONNECTOR_NAME = "test-connector";
private static final String TOPIC = "test-topic";
private static final Integer NUM_TASKS = 2;
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
private static final int NUM_WORKERS = 3; private static final int NUM_WORKERS = 3;
private static final String CONNECTOR_NAME = "test-connector";
private static final String TOPIC = "test-topic";
private static final int NUM_TASKS = 2;
private static final int NUM_RECORDS_PER_PARTITION = 10;
private Map<String, String> workerProps; private Map<String, String> workerProps;
private EmbeddedConnectCluster.Builder connectBuilder;
private EmbeddedConnectCluster connect; private EmbeddedConnectCluster connect;
@Before @Before
public void setup() { public void setup() {
Properties brokerProps = new Properties();
brokerProps.put("transaction.state.log.replication.factor", "1");
brokerProps.put("transaction.state.log.min.isr", "1");
// setup Connect worker properties // setup Connect worker properties
workerProps = new HashMap<>(); workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
// build a Connect cluster backed by Kafka and Zk // build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder() connectBuilder = new EmbeddedConnectCluster.Builder()
.name("connect-cluster") .name("connect-cluster")
.numWorkers(NUM_WORKERS) .numWorkers(NUM_WORKERS)
.workerProps(workerProps) .brokerProps(brokerProps)
.build(); .workerProps(workerProps);
connect.start();
} }
@After @After
@ -99,6 +103,8 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testGetNonExistentConnectorOffsets() { public void testGetNonExistentConnectorOffsets() {
connect = connectBuilder.build();
connect.start();
ConnectRestException e = assertThrows(ConnectRestException.class, ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.connectorOffsets("non-existent-connector")); () -> connect.connectorOffsets("non-existent-connector"));
assertEquals(404, e.errorCode()); assertEquals(404, e.errorCode());
@ -106,11 +112,15 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testGetSinkConnectorOffsets() throws Exception { public void testGetSinkConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka());
} }
@Test @Test
public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
"overridden-group-id"); "overridden-group-id");
@ -126,6 +136,8 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) { try (AutoCloseable ignored = kafkaCluster::stop) {
@ -144,9 +156,9 @@ public class OffsetsApiIntegrationTest {
private void getAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { private void getAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
kafkaCluster.createTopic(TOPIC, 5); kafkaCluster.createTopic(TOPIC, 5);
// Produce 10 messages to each partition // Produce records to each partition
for (int partition = 0; partition < 5; partition++) { for (int partition = 0; partition < 5; partition++) {
for (int message = 0; message < 10; message++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value"); kafkaCluster.produce(TOPIC, partition, "key", "value");
} }
} }
@ -156,27 +168,31 @@ public class OffsetsApiIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time."); "Connector tasks did not start in time.");
waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 5, 10, verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets"); "Sink connector consumer group offsets should catch up to the topic end offsets");
// Produce 10 more messages to each partition // Produce more records to each partition
for (int partition = 0; partition < 5; partition++) { for (int partition = 0; partition < 5; partition++) {
for (int message = 0; message < 10; message++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value"); kafkaCluster.produce(TOPIC, partition, "key", "value");
} }
} }
waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 5, 20, verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, 2 * NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets"); "Sink connector consumer group offsets should catch up to the topic end offsets");
} }
@Test @Test
public void testGetSourceConnectorOffsets() throws Exception { public void testGetSourceConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
} }
@Test @Test
public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic");
getAndVerifySourceConnectorOffsets(connectorConfigs); getAndVerifySourceConnectorOffsets(connectorConfigs);
@ -184,6 +200,8 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) { try (AutoCloseable ignored = kafkaCluster::stop) {
@ -205,19 +223,21 @@ public class OffsetsApiIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time."); "Connector tasks did not start in time.");
waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 10, verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced"); "Source connector offsets should reflect the expected number of records produced");
// Each task should produce 10 more records // Each task should produce more records
connectorConfigs.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, "20"); connectorConfigs.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(2 * NUM_RECORDS_PER_PARTITION));
connect.configureConnector(CONNECTOR_NAME, connectorConfigs); connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 20, verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 2 * NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced"); "Source connector offsets should reflect the expected number of records produced");
} }
@Test @Test
public void testAlterOffsetsNonExistentConnector() throws Exception { public void testAlterOffsetsNonExistentConnector() throws Exception {
connect = connectBuilder.build();
connect.start();
ConnectRestException e = assertThrows(ConnectRestException.class, ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.alterConnectorOffsets("non-existent-connector", new ConnectorOffsets(Collections.singletonList( () -> connect.alterConnectorOffsets("non-existent-connector", new ConnectorOffsets(Collections.singletonList(
new ConnectorOffset(Collections.emptyMap(), Collections.emptyMap()))))); new ConnectorOffset(Collections.emptyMap(), Collections.emptyMap())))));
@ -226,6 +246,8 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testAlterOffsetsNonStoppedConnector() throws Exception { public void testAlterOffsetsNonStoppedConnector() throws Exception {
connect = connectBuilder.build();
connect.start();
// Create source connector // Create source connector
connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs()); connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
@ -260,11 +282,15 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testAlterSinkConnectorOffsets() throws Exception { public void testAlterSinkConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka());
} }
@Test @Test
public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
"overridden-group-id"); "overridden-group-id");
@ -279,6 +305,8 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) { try (AutoCloseable ignored = kafkaCluster::stop) {
@ -296,12 +324,11 @@ public class OffsetsApiIntegrationTest {
private void alterAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { private void alterAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
int numPartitions = 3; int numPartitions = 3;
int numMessages = 10;
kafkaCluster.createTopic(TOPIC, numPartitions); kafkaCluster.createTopic(TOPIC, numPartitions);
// Produce numMessages messages to each partition // Produce records to each partition
for (int partition = 0; partition < numPartitions; partition++) { for (int partition = 0; partition < numPartitions; partition++) {
for (int message = 0; message < numMessages; message++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value"); kafkaCluster.produce(TOPIC, partition, "key", "value");
} }
} }
@ -310,7 +337,7 @@ public class OffsetsApiIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time."); "Connector tasks did not start in time.");
waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, numMessages, verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets"); "Sink connector consumer group offsets should catch up to the topic end offsets");
connect.stopConnector(CONNECTOR_NAME); connect.stopConnector(CONNECTOR_NAME);
@ -337,7 +364,7 @@ public class OffsetsApiIntegrationTest {
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " +
"However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses.")); "However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."));
waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions - 1, 5, verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 5,
"Sink connector consumer group offsets should reflect the altered offsets"); "Sink connector consumer group offsets should reflect the altered offsets");
// Update the connector's configs; this time expect SinkConnector::alterOffsets to return true // Update the connector's configs; this time expect SinkConnector::alterOffsets to return true
@ -356,7 +383,7 @@ public class OffsetsApiIntegrationTest {
response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter));
assertThat(response, containsString("The offsets for this connector have been altered successfully")); assertThat(response, containsString("The offsets for this connector have been altered successfully"));
waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions - 1, 3, verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 3,
"Sink connector consumer group offsets should reflect the altered offsets"); "Sink connector consumer group offsets should reflect the altered offsets");
// Resume the connector and expect its offsets to catch up to the latest offsets // Resume the connector and expect its offsets to catch up to the latest offsets
@ -366,16 +393,18 @@ public class OffsetsApiIntegrationTest {
NUM_TASKS, NUM_TASKS,
"Connector tasks did not resume in time" "Connector tasks did not resume in time"
); );
waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, 10, verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets"); "Sink connector consumer group offsets should catch up to the topic end offsets");
} }
@Test @Test
public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception {
connect = connectBuilder.build();
connect.start();
connect.kafka().createTopic(TOPIC, 1); connect.kafka().createTopic(TOPIC, 1);
// Produce 10 messages // Produce records
for (int message = 0; message < 10; message++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
connect.kafka().produce(TOPIC, 0, "key", "value"); connect.kafka().produce(TOPIC, 0, "key", "value");
} }
@ -404,6 +433,8 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testAlterSinkConnectorOffsetsInvalidRequestBody() throws Exception { public void testAlterSinkConnectorOffsetsInvalidRequestBody() throws Exception {
connect = connectBuilder.build();
connect.start();
// Create a sink connector and stop it // Create a sink connector and stop it
connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorConfigs()); connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
@ -466,18 +497,24 @@ public class OffsetsApiIntegrationTest {
@Test @Test
public void testAlterSourceConnectorOffsets() throws Exception { public void testAlterSourceConnectorOffsets() throws Exception {
alterAndVerifySourceConnectorOffsets(connect, baseSourceConnectorConfigs()); connect = connectBuilder.build();
connect.start();
alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
} }
@Test @Test
public void testAlterSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { public void testAlterSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic");
alterAndVerifySourceConnectorOffsets(connect, connectorConfigs); alterAndVerifySourceConnectorOffsets(connectorConfigs);
} }
@Test @Test
public void testAlterSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { public void testAlterSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) { try (AutoCloseable ignored = kafkaCluster::stop) {
@ -489,36 +526,26 @@ public class OffsetsApiIntegrationTest {
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
kafkaCluster.bootstrapServers()); kafkaCluster.bootstrapServers());
alterAndVerifySourceConnectorOffsets(connect, connectorConfigs); alterAndVerifySourceConnectorOffsets(connectorConfigs);
} }
} }
@Test @Test
public void testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception { public void testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception {
Properties brokerProps = new Properties();
brokerProps.put("transaction.state.log.replication.factor", "1");
brokerProps.put("transaction.state.log.min.isr", "1");
workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new EmbeddedConnectCluster.Builder() connect = connectBuilder.workerProps(workerProps).build();
.name("connect-cluster") connect.start();
.brokerProps(brokerProps)
.numWorkers(NUM_WORKERS)
.workerProps(workerProps)
.build();
exactlyOnceSupportEnabledConnectCluster.start();
try (AutoCloseable ignored = exactlyOnceSupportEnabledConnectCluster::stop) { alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
alterAndVerifySourceConnectorOffsets(exactlyOnceSupportEnabledConnectCluster, baseSourceConnectorConfigs());
}
} }
public void alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect, Map<String, String> connectorConfigs) throws Exception { public void alterAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception {
// Create source connector // Create source connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs); connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time."); "Connector tasks did not start in time.");
waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 10, verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced"); "Source connector offsets should reflect the expected number of records produced");
connect.stopConnector(CONNECTOR_NAME); connect.stopConnector(CONNECTOR_NAME);
@ -540,7 +567,7 @@ public class OffsetsApiIntegrationTest {
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " +
"However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses.")); "However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."));
waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 5, verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 5,
"Source connector offsets should reflect the altered offsets"); "Source connector offsets should reflect the altered offsets");
// Update the connector's configs; this time expect SourceConnector::alterOffsets to return true // Update the connector's configs; this time expect SourceConnector::alterOffsets to return true
@ -560,7 +587,7 @@ public class OffsetsApiIntegrationTest {
response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter));
assertThat(response, containsString("The offsets for this connector have been altered successfully")); assertThat(response, containsString("The offsets for this connector have been altered successfully"));
waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 7, verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 7,
"Source connector offsets should reflect the altered offsets"); "Source connector offsets should reflect the altered offsets");
// Resume the connector and expect its offsets to catch up to the latest offsets // Resume the connector and expect its offsets to catch up to the latest offsets
@ -570,7 +597,277 @@ public class OffsetsApiIntegrationTest {
NUM_TASKS, NUM_TASKS,
"Connector tasks did not resume in time" "Connector tasks did not resume in time"
); );
waitForExpectedSourceConnectorOffsets(connect, CONNECTOR_NAME, NUM_TASKS, 10, verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
}
@Test
public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws Exception {
connect = connectBuilder.build();
connect.start();
// Create a source connector and stop it
connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time.");
connect.stopConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector did not stop in time"
);
String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME));
String content = "[]";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(500, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Cannot deserialize value"));
}
content = "{}";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(400, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request"));
}
content = "{\"key\": []}";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(500, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Unrecognized field"));
}
content = "{\"offsets\": []}";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(400, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request"));
}
content = "{\"offsets\": {}}";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(500, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Cannot deserialize value"));
}
content = "{\"offsets\": [123]}";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(500, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Cannot construct instance"));
}
content = "{\"offsets\": [{\"key\": \"val\"}]}";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(500, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Unrecognized field"));
}
content = "{\"offsets\": [{\"partition\": []]}]}";
try (Response response = connect.requestPatch(url, content)) {
assertEquals(500, response.getStatus());
assertThat(response.getEntity().toString(), containsString("Cannot deserialize value"));
}
}
@Test
public void testResetSinkConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka());
}
@Test
public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
"overridden-group-id");
resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
// Ensure that the overridden consumer group ID was the one actually used
try (Admin admin = connect.kafka().createAdminClient()) {
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
}
}
@Test
public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
kafkaCluster.start();
Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
kafkaCluster.bootstrapServers());
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
kafkaCluster.bootstrapServers());
resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster);
}
}
private void resetAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
int numPartitions = 3;
kafkaCluster.createTopic(TOPIC, numPartitions);
// Produce records to each partition
for (int partition = 0; partition < numPartitions; partition++) {
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value");
}
}
// Create sink connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
connect.stopConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector did not stop in time"
);
// Reset the sink connector's offsets
String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
// Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error
response = connect.resetConnectorOffsets(CONNECTOR_NAME);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
// Resume the connector and expect its offsets to catch up to the latest offsets
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector tasks did not resume in time"
);
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
}
@Test
public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception {
connect = connectBuilder.build();
connect.start();
connect.kafka().createTopic(TOPIC, 1);
// Produce records
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
connect.kafka().produce(TOPIC, 0, "key", "value");
}
// Configure a sink connector whose sink task blocks in its stop method
Map<String, String> connectorConfigs = new HashMap<>();
connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName());
connectorConfigs.put(TOPICS_CONFIG, TOPIC);
connectorConfigs.put("block", "Task::stop");
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1,
"Connector tasks did not start in time.");
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
connect.stopConnector(CONNECTOR_NAME);
// Try to reset the offsets
ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME));
assertThat(e.getMessage(), containsString("zombie sink task"));
}
@Test
public void testResetSourceConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
}
@Test
public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic");
resetAndVerifySourceConnectorOffsets(connectorConfigs);
}
@Test
public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
kafkaCluster.start();
Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
kafkaCluster.bootstrapServers());
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
kafkaCluster.bootstrapServers());
resetAndVerifySourceConnectorOffsets(connectorConfigs);
}
}
@Test
public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception {
workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
connect = connectBuilder.workerProps(workerProps).build();
connect.start();
resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
}
public void resetAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception {
// Create source connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
connect.stopConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector did not stop in time"
);
// Reset the source connector's offsets
String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
// Reset the source connector's offsets again while it is still in a STOPPED state and ensure that there is no error
response = connect.resetConnectorOffsets(CONNECTOR_NAME);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
// Resume the connector and expect its offsets to catch up to the latest offsets
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector tasks did not resume in time"
);
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced"); "Source connector offsets should reflect the expected number of records produced");
} }
@ -590,7 +887,7 @@ public class OffsetsApiIntegrationTest {
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPIC_CONFIG, TOPIC); props.put(TOPIC_CONFIG, TOPIC);
props.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3"); props.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3");
props.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, "10"); props.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(NUM_RECORDS_PER_PARTITION));
props.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, "1"); props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, "1");
@ -600,8 +897,8 @@ public class OffsetsApiIntegrationTest {
/** /**
* Verify whether the actual consumer group offsets for a sink connector match the expected offsets. The verification * Verify whether the actual consumer group offsets for a sink connector match the expected offsets. The verification
* is done using the `GET /connectors/{connector}/offsets` REST API which is repeatedly queried until the offsets match * is done using the <strong><em>GET /connectors/{connector}/offsets</em></strong> REST API which is repeatedly queried
* or the {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes the following: * until the offsets match or the {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes the following:
* <ol> * <ol>
* <li>The sink connector is consuming from a single Kafka topic</li> * <li>The sink connector is consuming from a single Kafka topic</li>
* <li>The expected offset for each partition in the topic is the same</li> * <li>The expected offset for each partition in the topic is the same</li>
@ -615,8 +912,8 @@ public class OffsetsApiIntegrationTest {
* 10 records) * 10 records)
* @throws InterruptedException if the thread is interrupted while waiting for the actual offsets to match the expected offsets * @throws InterruptedException if the thread is interrupted while waiting for the actual offsets to match the expected offsets
*/ */
private void waitForExpectedSinkConnectorOffsets(String connectorName, String expectedTopic, int expectedPartitions, private void verifyExpectedSinkConnectorOffsets(String connectorName, String expectedTopic, int expectedPartitions,
int expectedOffset, String conditionDetails) throws InterruptedException { int expectedOffset, String conditionDetails) throws InterruptedException {
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
ConnectorOffsets offsets = connect.connectorOffsets(connectorName); ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
if (offsets.offsets().size() != expectedPartitions) { if (offsets.offsets().size() != expectedPartitions) {
@ -634,11 +931,10 @@ public class OffsetsApiIntegrationTest {
/** /**
* Verify whether the actual offsets for a source connector match the expected offsets. The verification is done using the * Verify whether the actual offsets for a source connector match the expected offsets. The verification is done using the
* `GET /connectors/{connector}/offsets` REST API which is repeatedly queried until the offsets match or the * <strong><em>GET /connectors/{connector}/offsets</em></strong> REST API which is repeatedly queried until the offsets match
* {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes that the source connector is a * or the {@link #OFFSET_READ_TIMEOUT_MS timeout} is reached. Note that this assumes that the source connector is a
* {@link MonitorableSourceConnector} * {@link MonitorableSourceConnector}
* *
* @param connect the Connect cluster that is running the source connector
* @param connectorName the name of the source connector whose offsets are to be verified * @param connectorName the name of the source connector whose offsets are to be verified
* @param numTasks the number of tasks for the source connector * @param numTasks the number of tasks for the source connector
* @param expectedOffset the expected offset for each source partition * @param expectedOffset the expected offset for each source partition
@ -646,8 +942,8 @@ public class OffsetsApiIntegrationTest {
* 10 records) * 10 records)
* @throws InterruptedException if the thread is interrupted while waiting for the actual offsets to match the expected offsets * @throws InterruptedException if the thread is interrupted while waiting for the actual offsets to match the expected offsets
*/ */
private void waitForExpectedSourceConnectorOffsets(EmbeddedConnectCluster connect, String connectorName, int numTasks, private void verifyExpectedSourceConnectorOffsets(String connectorName, int numTasks,
int expectedOffset, String conditionDetails) throws InterruptedException { int expectedOffset, String conditionDetails) throws InterruptedException {
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
ConnectorOffsets offsets = connect.connectorOffsets(connectorName); ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
// The MonitorableSourceConnector has a source partition per task // The MonitorableSourceConnector has a source partition per task
@ -663,4 +959,19 @@ public class OffsetsApiIntegrationTest {
return true; return true;
}, OFFSET_READ_TIMEOUT_MS, conditionDetails); }, OFFSET_READ_TIMEOUT_MS, conditionDetails);
} }
/**
* Verify whether the <strong><em>GET /connectors/{connector}/offsets</em></strong> returns empty offsets for a source
* or sink connector whose offsets have been reset via the <strong><em>DELETE /connectors/{connector}/offsets</em></strong>
* REST API
*
* @param connectorName the name of the connector whose offsets are to be verified
* @throws InterruptedException if the thread is interrupted while waiting for the offsets to be empty
*/
private void verifyEmptyConnectorOffsets(String connectorName) throws InterruptedException {
TestUtils.waitForCondition(() -> {
ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
return offsets.offsets().isEmpty();
}, OFFSET_READ_TIMEOUT_MS, "Connector offsets should be empty after resetting offsets");
}
} }

View File

@ -22,6 +22,8 @@ import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.FenceProducersResult; import org.apache.kafka.clients.admin.FenceProducersResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
@ -61,6 +63,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.Message; import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
@ -89,6 +92,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.AdditionalAnswers; import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockedConstruction; import org.mockito.MockedConstruction;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -134,6 +138,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GRO
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG; import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -144,6 +149,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anySet;
@ -1819,16 +1825,24 @@ public class WorkerTest {
verifyKafkaClusterId(); verifyKafkaClusterId();
} }
@SuppressWarnings("unchecked")
private void mockAdminListConsumerGroupOffsets(Admin admin, Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets, Exception e) { private void mockAdminListConsumerGroupOffsets(Admin admin, Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets, Exception e) {
mockAdminListConsumerGroupOffsets(admin, consumerGroupOffsets, e, null, 0);
}
private void mockAdminListConsumerGroupOffsets(Admin admin, Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets, Exception e, Time time, long delayMs) {
ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class); ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class);
when(admin.listConsumerGroupOffsets(anyString(), any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result); when(admin.listConsumerGroupOffsets(anyString(), any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result);
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> adminFuture = mock(KafkaFuture.class); KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> adminFuture = new KafkaFutureImpl<>();
when(result.partitionsToOffsetAndMetadata()).thenReturn(adminFuture); if (e != null) {
when(adminFuture.whenComplete(any())).thenAnswer(invocation -> { adminFuture.completeExceptionally(e);
((KafkaFuture.BiConsumer<Map<TopicPartition, OffsetAndMetadata>, Throwable>) invocation.getArgument(0)) } else {
.accept(consumerGroupOffsets, e); adminFuture.complete(consumerGroupOffsets);
return null; }
when(result.partitionsToOffsetAndMetadata()).thenAnswer(invocation -> {
if (time != null) {
time.sleep(delayMs);
}
return adminFuture;
}); });
} }
@ -1915,13 +1929,13 @@ public class WorkerTest {
"support altering of offsets")); "support altering of offsets"));
FutureCallback<Message> cb = new FutureCallback<>(); FutureCallback<Message> cb = new FutureCallback<>();
worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps, worker.modifyConnectorOffsets(CONNECTOR_ID, connectorProps,
Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
cb); cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS));
assertEquals(ConnectException.class, e.getCause().getClass()); assertEquals(ConnectException.class, e.getCause().getClass());
assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + " because it doesn't support external modification of offsets", assertEquals("Failed to modify offsets for connector " + CONNECTOR_ID + " because it doesn't support external modification of offsets",
e.getCause().getMessage()); e.getCause().getMessage());
verifyGenericIsolation(); verifyGenericIsolation();
@ -1952,7 +1966,7 @@ public class WorkerTest {
}); });
FutureCallback<Message> cb = new FutureCallback<>(); FutureCallback<Message> cb = new FutureCallback<>();
worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer,
offsetWriter, Thread.currentThread().getContextClassLoader(), cb); offsetWriter, Thread.currentThread().getContextClassLoader(), cb);
assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
@ -1988,7 +2002,7 @@ public class WorkerTest {
}); });
FutureCallback<Message> cb = new FutureCallback<>(); FutureCallback<Message> cb = new FutureCallback<>();
worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer,
offsetWriter, Thread.currentThread().getContextClassLoader(), cb); offsetWriter, Thread.currentThread().getContextClassLoader(), cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message()); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message());
assertEquals(ConnectException.class, e.getCause().getClass()); assertEquals(ConnectException.class, e.getCause().getClass());
@ -2002,7 +2016,7 @@ public class WorkerTest {
} }
@Test @Test
public void testAlterOffsetsSinkConnectorNoResets() throws Exception { public void testAlterOffsetsSinkConnectorNoDeletes() throws Exception {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class);
Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
@ -2024,7 +2038,7 @@ public class WorkerTest {
} }
@Test @Test
public void testAlterOffsetSinkConnectorOnlyResets() throws Exception { public void testAlterOffsetSinkConnectorOnlyDeletes() throws Exception {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class);
Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
@ -2049,7 +2063,7 @@ public class WorkerTest {
} }
@Test @Test
public void testAlterOffsetsSinkConnectorAltersAndResets() throws Exception { public void testAlterOffsetsSinkConnectorAltersAndDeletes() throws Exception {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -2109,7 +2123,7 @@ public class WorkerTest {
} }
FutureCallback<Message> cb = new FutureCallback<>(); FutureCallback<Message> cb = new FutureCallback<>();
worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets,
Thread.currentThread().getContextClassLoader(), cb); Thread.currentThread().getContextClassLoader(), cb);
assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
@ -2145,7 +2159,7 @@ public class WorkerTest {
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100")); Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100"));
FutureCallback<Message> cb = new FutureCallback<>(); FutureCallback<Message> cb = new FutureCallback<>();
worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets,
Thread.currentThread().getContextClassLoader(), cb); Thread.currentThread().getContextClassLoader(), cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS));
@ -2194,7 +2208,7 @@ public class WorkerTest {
partitionOffsets.put(partition2, null); partitionOffsets.put(partition2, null);
FutureCallback<Message> cb = new FutureCallback<>(); FutureCallback<Message> cb = new FutureCallback<>();
worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets,
Thread.currentThread().getContextClassLoader(), cb); Thread.currentThread().getContextClassLoader(), cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS));
@ -2229,7 +2243,7 @@ public class WorkerTest {
partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100")); partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100"));
FutureCallback<Message> cb = new FutureCallback<>(); FutureCallback<Message> cb = new FutureCallback<>();
worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets,
Thread.currentThread().getContextClassLoader(), cb); Thread.currentThread().getContextClassLoader(), cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS));
@ -2240,6 +2254,193 @@ public class WorkerTest {
verifyKafkaClusterId(); verifyKafkaClusterId();
} }
@Test
@SuppressWarnings("unchecked")
public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Exception {
Map<String, String> workerProps = new HashMap<>(this.workerProps);
workerProps.put("exactly.once.source.support", "enabled");
workerProps.put("bootstrap.servers", "localhost:9092");
workerProps.put("group.id", "connect-cluster");
workerProps.put("config.storage.topic", "connect-configs");
workerProps.put("offset.storage.topic", "connect-offsets");
workerProps.put("status.storage.topic", "connect-statuses");
config = new DistributedConfig(workerProps);
mockKafkaClusterId();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(),
allConnectorClientConfigOverridePolicy, null);
worker.start();
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true);
ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class);
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
Set<Map<String, Object>> connectorPartitions = new HashSet<>();
connectorPartitions.add(Collections.singletonMap("partitionKey1", new Object()));
connectorPartitions.add(Collections.singletonMap("partitionKey2", new Object()));
when(offsetStore.connectorPartitions(eq(CONNECTOR_ID))).thenReturn(connectorPartitions);
when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
invocation.getArgument(0, Callback.class).onCompletion(null, null);
return null;
});
FutureCallback<Message> cb = new FutureCallback<>();
worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, null, offsetStore, producer,
offsetWriter, Thread.currentThread().getContextClassLoader(), cb);
assertEquals("The offsets for this connector have been reset successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
InOrder inOrder = Mockito.inOrder(offsetStore, offsetWriter, producer);
inOrder.verify(offsetStore).start();
connectorPartitions.forEach(partition -> inOrder.verify(offsetWriter).offset(partition, null));
inOrder.verify(offsetWriter).beginFlush();
inOrder.verify(producer).initTransactions();
inOrder.verify(producer).beginTransaction();
inOrder.verify(offsetWriter).doFlush(any());
inOrder.verify(producer).commitTransaction();
inOrder.verify(offsetStore, timeout(1000)).stop();
verifyKafkaClusterId();
}
@Test
public void testResetOffsetsSinkConnector() throws Exception {
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
Admin admin = mock(Admin.class);
Time time = new MockTime();
worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(),
allConnectorClientConfigOverridePolicy, config -> admin);
worker.start();
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
TopicPartition tp = new TopicPartition("test-topic", 0);
mockAdminListConsumerGroupOffsets(admin, Collections.singletonMap(tp, new OffsetAndMetadata(10L)), null, time, 2000);
when(sinkConnector.alterOffsets(eq(connectorProps), eq(Collections.singletonMap(tp, null)))).thenAnswer(invocation -> {
time.sleep(3000);
return true;
});
DeleteConsumerGroupsResult deleteConsumerGroupsResult = mock(DeleteConsumerGroupsResult.class);
when(admin.deleteConsumerGroups(anyCollection(), any(DeleteConsumerGroupsOptions.class))).thenReturn(deleteConsumerGroupsResult);
when(deleteConsumerGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
FutureCallback<Message> cb = new FutureCallback<>();
worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, null,
Thread.currentThread().getContextClassLoader(), cb);
assertEquals("The offsets for this connector have been reset successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
ArgumentCaptor<DeleteConsumerGroupsOptions> deleteConsumerGroupsOptionsArgumentCaptor = ArgumentCaptor.forClass(DeleteConsumerGroupsOptions.class);
verify(admin).deleteConsumerGroups(anyCollection(), deleteConsumerGroupsOptionsArgumentCaptor.capture());
// Expect the call to Admin::deleteConsumerGroups to have a timeout value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS
// minus the delay introduced in the call to Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call to
// SinkConnector::alterOffsets (3000 ms)
assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L - 3000L,
deleteConsumerGroupsOptionsArgumentCaptor.getValue().timeoutMs().intValue());
verify(admin, timeout(1000)).close();
verifyKafkaClusterId();
}
@Test
public void testResetOffsetsSinkConnectorDeleteConsumerGroupError() throws Exception {
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
Admin admin = mock(Admin.class);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newCachedThreadPool(),
allConnectorClientConfigOverridePolicy, config -> admin);
worker.start();
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
TopicPartition tp = new TopicPartition("test-topic", 0);
mockAdminListConsumerGroupOffsets(admin, Collections.singletonMap(tp, new OffsetAndMetadata(10L)), null);
when(sinkConnector.alterOffsets(eq(connectorProps), eq(Collections.singletonMap(tp, null)))).thenReturn(true);
DeleteConsumerGroupsResult deleteConsumerGroupsResult = mock(DeleteConsumerGroupsResult.class);
when(admin.deleteConsumerGroups(anyCollection(), any(DeleteConsumerGroupsOptions.class))).thenReturn(deleteConsumerGroupsResult);
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new ClusterAuthorizationException("Test exception"));
when(deleteConsumerGroupsResult.all()).thenReturn(future);
FutureCallback<Message> cb = new FutureCallback<>();
worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, null,
Thread.currentThread().getContextClassLoader(), cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS));
assertEquals(ConnectException.class, e.getCause().getClass());
verify(admin, timeout(1000)).close();
verifyKafkaClusterId();
}
@Test
@SuppressWarnings("unchecked")
public void testModifySourceConnectorOffsetsTimeout() throws Exception {
mockKafkaClusterId();
Time time = new MockTime();
worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(),
allConnectorClientConfigOverridePolicy, null);
worker.start();
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> {
time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
return true;
});
ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class);
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
Map<Map<String, ?>, Map<String, ?>> partitionOffsets = Collections.singletonMap(
Collections.singletonMap("partitionKey", "partitionValue"),
Collections.singletonMap("offsetKey", "offsetValue"));
FutureCallback<Message> cb = new FutureCallback<>();
worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer,
offsetWriter, Thread.currentThread().getContextClassLoader(), cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message());
assertEquals(ConnectException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage(), containsString("Timed out"));
verify(offsetStore).start();
verify(offsetStore, timeout(1000)).stop();
verifyKafkaClusterId();
}
@Test
public void testModifyOffsetsSinkConnectorTimeout() throws Exception {
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
Admin admin = mock(Admin.class);
Time time = new MockTime();
worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(),
allConnectorClientConfigOverridePolicy, config -> admin);
worker.start();
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sinkConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> {
time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
return true;
});
FutureCallback<Message> cb = new FutureCallback<>();
worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, new HashMap<>(),
Thread.currentThread().getContextClassLoader(), cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message());
assertEquals(ConnectException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage(), containsString("Timed out"));
verify(admin, timeout(1000)).close();
verifyKafkaClusterId();
}
private void assertStatusMetrics(long expected, String metricName) { private void assertStatusMetrics(long expected, String metricName) {
MetricGroup statusMetrics = worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector()); MetricGroup statusMetrics = worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector());
if (expected == 0L) { if (expected == 0L) {

View File

@ -122,6 +122,7 @@ import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CON
import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.isNull;
import static org.easymock.EasyMock.leq; import static org.easymock.EasyMock.leq;
import static org.easymock.EasyMock.newCapture; import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -4146,7 +4147,7 @@ public class DistributedHerderTest {
} }
@Test @Test
public void testAlterConnectorOffsetsUnknownConnector() throws Exception { public void testModifyConnectorOffsetsUnknownConnector() throws Exception {
// Get the initial assignment // Get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
@ -4155,7 +4156,7 @@ public class DistributedHerderTest {
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
// Now handle the alter connector offsets request // Now handle the connector offsets modification request
member.wakeup(); member.wakeup();
PowerMock.expectLastCall(); PowerMock.expectLastCall();
member.ensureActive(); member.ensureActive();
@ -4168,7 +4169,7 @@ public class DistributedHerderTest {
herder.tick(); herder.tick();
FutureCallback<Message> callback = new FutureCallback<>(); FutureCallback<Message> callback = new FutureCallback<>();
herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); herder.modifyConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback);
herder.tick(); herder.tick();
ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(e.getCause() instanceof NotFoundException); assertTrue(e.getCause() instanceof NotFoundException);
@ -4177,7 +4178,7 @@ public class DistributedHerderTest {
} }
@Test @Test
public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { public void testModifyOffsetsConnectorNotInStoppedState() throws Exception {
// Get the initial assignment // Get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
@ -4186,7 +4187,7 @@ public class DistributedHerderTest {
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
// Now handle the alter connector offsets request // Now handle the connector offsets modification request
member.wakeup(); member.wakeup();
PowerMock.expectLastCall(); PowerMock.expectLastCall();
member.ensureActive(); member.ensureActive();
@ -4199,7 +4200,7 @@ public class DistributedHerderTest {
herder.tick(); herder.tick();
FutureCallback<Message> callback = new FutureCallback<>(); FutureCallback<Message> callback = new FutureCallback<>();
herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); herder.modifyConnectorOffsets(CONN1, null, callback);
herder.tick(); herder.tick();
ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(e.getCause() instanceof BadRequestException); assertTrue(e.getCause() instanceof BadRequestException);
@ -4208,7 +4209,7 @@ public class DistributedHerderTest {
} }
@Test @Test
public void testAlterOffsetsNotLeader() throws Exception { public void testModifyOffsetsNotLeader() throws Exception {
// Get the initial assignment // Get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("member"); EasyMock.expect(member.memberId()).andStubReturn("member");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
@ -4217,7 +4218,7 @@ public class DistributedHerderTest {
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
// Now handle the alter connector offsets request // Now handle the connector offsets modification request
member.wakeup(); member.wakeup();
PowerMock.expectLastCall(); PowerMock.expectLastCall();
member.ensureActive(); member.ensureActive();
@ -4229,7 +4230,7 @@ public class DistributedHerderTest {
herder.tick(); herder.tick();
FutureCallback<Message> callback = new FutureCallback<>(); FutureCallback<Message> callback = new FutureCallback<>();
herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); herder.modifyConnectorOffsets(CONN1, new HashMap<>(), callback);
herder.tick(); herder.tick();
ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(e.getCause() instanceof NotLeaderException); assertTrue(e.getCause() instanceof NotLeaderException);
@ -4238,7 +4239,7 @@ public class DistributedHerderTest {
} }
@Test @Test
public void testAlterOffsetsSinkConnector() throws Exception { public void testModifyOffsetsSinkConnector() throws Exception {
EasyMock.reset(herder); EasyMock.reset(herder);
EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SINK).anyTimes(); EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SINK).anyTimes();
PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
@ -4253,7 +4254,9 @@ public class DistributedHerderTest {
PowerMock.expectLastCall(); PowerMock.expectLastCall();
// Now handle the alter connector offsets request // Now handle the alter connector offsets request
Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap("partitionKey", "partitionValue"),
Collections.singletonMap("offsetKey", "offsetValue"));
member.wakeup(); member.wakeup();
PowerMock.expectLastCall(); PowerMock.expectLastCall();
member.ensureActive(); member.ensureActive();
@ -4262,7 +4265,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall(); PowerMock.expectLastCall();
expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance();
worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture));
Message msg = new Message("The offsets for this connector have been altered successfully"); Message msg = new Message("The offsets for this connector have been altered successfully");
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
workerCallbackCapture.getValue().onCompletion(null, msg); workerCallbackCapture.getValue().onCompletion(null, msg);
@ -4282,7 +4285,7 @@ public class DistributedHerderTest {
} }
@Test @Test
public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { public void testModifyOffsetsSourceConnectorExactlyOnceDisabled() throws Exception {
// Get the initial assignment // Get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
@ -4291,8 +4294,7 @@ public class DistributedHerderTest {
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
// Now handle the alter connector offsets request // Now handle the reset connector offsets request
Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
member.wakeup(); member.wakeup();
PowerMock.expectLastCall(); PowerMock.expectLastCall();
member.ensureActive(); member.ensureActive();
@ -4301,8 +4303,8 @@ public class DistributedHerderTest {
PowerMock.expectLastCall(); PowerMock.expectLastCall();
expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance();
worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), isNull(), capture(workerCallbackCapture));
Message msg = new Message("The offsets for this connector have been altered successfully"); Message msg = new Message("The offsets for this connector have been reset successfully");
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
workerCallbackCapture.getValue().onCompletion(null, msg); workerCallbackCapture.getValue().onCompletion(null, msg);
return null; return null;
@ -4312,16 +4314,16 @@ public class DistributedHerderTest {
herder.tick(); herder.tick();
FutureCallback<Message> callback = new FutureCallback<>(); FutureCallback<Message> callback = new FutureCallback<>();
herder.alterConnectorOffsets(CONN1, offsets, callback); herder.resetConnectorOffsets(CONN1, callback);
herder.tick(); herder.tick();
assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
assertEquals("The offsets for this connector have been altered successfully", msg.message()); assertEquals("The offsets for this connector have been reset successfully", msg.message());
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@Test @Test
public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws Exception { public void testModifyOffsetsSourceConnectorExactlyOnceEnabled() throws Exception {
// Setup herder with exactly-once support for source connectors enabled // Setup herder with exactly-once support for source connectors enabled
herder = exactlyOnceHerder(); herder = exactlyOnceHerder();
rebalanceListener = herder.new RebalanceListener(time); rebalanceListener = herder.new RebalanceListener(time);
@ -4337,7 +4339,9 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().anyTimes(); PowerMock.expectLastCall().anyTimes();
// Now handle the alter connector offsets request // Now handle the alter connector offsets request
Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap("partitionKey", "partitionValue"),
Collections.singletonMap("offsetKey", "offsetValue"));
member.wakeup(); member.wakeup();
PowerMock.expectLastCall().anyTimes(); PowerMock.expectLastCall().anyTimes();
member.ensureActive(); member.ensureActive();
@ -4354,7 +4358,7 @@ public class DistributedHerderTest {
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture); EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture);
// Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active
// fencing list. The other is the callback passed from DistributedHerder::alterConnectorOffsets in order to // fencing list. The other is the callback passed from DistributedHerder::modifyConnectorOffsets in order to
// queue up the actual alter offsets request if the zombie fencing succeeds. // queue up the actual alter offsets request if the zombie fencing succeeds.
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture(); Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture();
@ -4366,7 +4370,7 @@ public class DistributedHerderTest {
Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance();
Message msg = new Message("The offsets for this connector have been altered successfully"); Message msg = new Message("The offsets for this connector have been altered successfully");
worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
workerCallbackCapture.getValue().onCompletion(null, msg); workerCallbackCapture.getValue().onCompletion(null, msg);
return null; return null;
@ -4374,13 +4378,13 @@ public class DistributedHerderTest {
// Handle the second alter connector offsets request. No zombie fencing request to the worker is expected now since we // Handle the second alter connector offsets request. No zombie fencing request to the worker is expected now since we
// already did a round of zombie fencing last time and no new tasks came up in the meanwhile. The config snapshot is // already did a round of zombie fencing last time and no new tasks came up in the meanwhile. The config snapshot is
// refreshed once at the beginning of the DistributedHerder::alterConnectorOffsets method, once before checking // refreshed once at the beginning of the DistributedHerder::modifyConnectorOffsets method, once before checking
// whether zombie fencing is required, and once before actually proceeding to alter connector offsets. // whether zombie fencing is required, and once before actually proceeding to alter connector offsets.
expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
Capture<Callback<Message>> workerCallbackCapture2 = Capture.newInstance(); Capture<Callback<Message>> workerCallbackCapture2 = Capture.newInstance();
worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2)); worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
workerCallbackCapture2.getValue().onCompletion(null, msg); workerCallbackCapture2.getValue().onCompletion(null, msg);
return null; return null;
@ -4406,7 +4410,7 @@ public class DistributedHerderTest {
} }
@Test @Test
public void testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws Exception { public void testModifyOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws Exception {
// Setup herder with exactly-once support for source connectors enabled // Setup herder with exactly-once support for source connectors enabled
herder = exactlyOnceHerder(); herder = exactlyOnceHerder();
rebalanceListener = herder.new RebalanceListener(time); rebalanceListener = herder.new RebalanceListener(time);
@ -4421,7 +4425,7 @@ public class DistributedHerderTest {
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall().anyTimes(); PowerMock.expectLastCall().anyTimes();
// Now handle the alter connector offsets request // Now handle the reset connector offsets request
member.wakeup(); member.wakeup();
PowerMock.expectLastCall().anyTimes(); PowerMock.expectLastCall().anyTimes();
member.ensureActive(); member.ensureActive();
@ -4437,8 +4441,8 @@ public class DistributedHerderTest {
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture); EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture);
// Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active
// fencing list. The other is the callback passed from DistributedHerder::alterConnectorOffsets in order to // fencing list. The other is the callback passed from DistributedHerder::modifyConnectorOffsets in order to
// queue up the actual alter offsets request if the zombie fencing succeeds. // queue up the actual reset offsets request if the zombie fencing succeeds.
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture(); Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture();
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(() -> { EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(() -> {
@ -4451,14 +4455,14 @@ public class DistributedHerderTest {
herder.tick(); herder.tick();
FutureCallback<Message> callback = new FutureCallback<>(); FutureCallback<Message> callback = new FutureCallback<>();
herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); herder.resetConnectorOffsets(CONN1, callback);
// Process the zombie fencing request // Process the zombie fencing request
herder.tick(); herder.tick();
// Process the alter offsets request // Process the reset offsets request
herder.tick(); herder.tick();
ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS));
assertEquals(ConnectException.class, e.getCause().getClass()); assertEquals(ConnectException.class, e.getCause().getClass());
assertEquals("Failed to perform zombie fencing for source connector prior to altering offsets", assertEquals("Failed to perform zombie fencing for source connector prior to modifying offsets",
e.getCause().getMessage()); e.getCause().getMessage());
PowerMock.verifyAll(); PowerMock.verifyAll();

View File

@ -838,6 +838,38 @@ public class ConnectorsResourceTest {
assertEquals(msg, response.getEntity()); assertEquals(msg, response.getEntity());
} }
@Test
public void testResetOffsetsNotLeader() throws Throwable {
final ArgumentCaptor<Callback<Message>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackNotLeaderException(cb).when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture());
when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/offsets?forward=true"), eq("DELETE"), isNull(), isNull(), any()))
.thenReturn(new RestClient.HttpResponse<>(200, new HashMap<>(), new Message("")));
connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME);
}
@Test
public void testResetOffsetsConnectorNotFound() {
final ArgumentCaptor<Callback<Message>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new NotFoundException("Connector not found"))
.when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture());
assertThrows(NotFoundException.class, () -> connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME));
}
@Test
public void testResetOffsets() throws Throwable {
final ArgumentCaptor<Callback<Message>> cb = ArgumentCaptor.forClass(Callback.class);
Message msg = new Message("The offsets for this connector have been reset successfully");
doAnswer(invocation -> {
cb.getValue().onCompletion(null, msg);
return null;
}).when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture());
Response response = connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME);
assertEquals(200, response.getStatus());
assertEquals(msg, response.getEntity());
}
private <T> byte[] serializeAsBytes(final T value) throws IOException { private <T> byte[] serializeAsBytes(final T value) throws IOException {
return new ObjectMapper().writeValueAsBytes(value); return new ObjectMapper().writeValueAsBytes(value);
} }

View File

@ -90,6 +90,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.isNull;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
@ -986,19 +987,26 @@ public class StandaloneHerderTest {
} }
@Test @Test
public void testAlterConnectorOffsetsUnknownConnector() { public void testModifyConnectorOffsetsUnknownConnector() {
PowerMock.replayAll(); PowerMock.replayAll();
FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>(); FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>();
herder.alterConnectorOffsets("unknown-connector", new HashMap<>(), alterOffsetsCallback); herder.alterConnectorOffsets("unknown-connector",
Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
alterOffsetsCallback);
ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(e.getCause() instanceof NotFoundException); assertTrue(e.getCause() instanceof NotFoundException);
FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>();
herder.resetConnectorOffsets("unknown-connector", resetOffsetsCallback);
e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(e.getCause() instanceof NotFoundException);
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@Test @Test
public void testAlterConnectorOffsetsConnectorNotInStoppedState() { public void testModifyConnectorOffsetsConnectorNotInStoppedState() {
PowerMock.replayAll(); PowerMock.replayAll();
herder.configState = new ClusterConfigState( herder.configState = new ClusterConfigState(
@ -1013,10 +1021,19 @@ public class StandaloneHerderTest {
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet() Collections.emptySet()
); );
FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>(); FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>();
herder.alterConnectorOffsets(CONNECTOR_NAME, new HashMap<>(), alterOffsetsCallback); herder.alterConnectorOffsets(CONNECTOR_NAME,
Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
alterOffsetsCallback);
ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(e.getCause() instanceof BadRequestException); assertTrue(e.getCause() instanceof BadRequestException);
FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>();
herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback);
e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(e.getCause() instanceof BadRequestException);
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@ -1024,7 +1041,7 @@ public class StandaloneHerderTest {
public void testAlterConnectorOffsets() throws Exception { public void testAlterConnectorOffsets() throws Exception {
Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance();
Message msg = new Message("The offsets for this connector have been altered successfully"); Message msg = new Message("The offsets for this connector have been altered successfully");
worker.alterConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), anyObject(Map.class), capture(workerCallbackCapture)); worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), anyObject(Map.class), capture(workerCallbackCapture));
EasyMock.expectLastCall().andAnswer(() -> { EasyMock.expectLastCall().andAnswer(() -> {
workerCallbackCapture.getValue().onCompletion(null, msg); workerCallbackCapture.getValue().onCompletion(null, msg);
return null; return null;
@ -1044,11 +1061,42 @@ public class StandaloneHerderTest {
Collections.emptySet() Collections.emptySet()
); );
FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>(); FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>();
herder.alterConnectorOffsets(CONNECTOR_NAME, new HashMap<>(), alterOffsetsCallback); herder.alterConnectorOffsets(CONNECTOR_NAME,
Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
alterOffsetsCallback);
assertEquals(msg, alterOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); assertEquals(msg, alterOffsetsCallback.get(1000, TimeUnit.MILLISECONDS));
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@Test
public void testResetConnectorOffsets() throws Exception {
Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance();
Message msg = new Message("The offsets for this connector have been reset successfully");
worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), isNull(), capture(workerCallbackCapture));
EasyMock.expectLastCall().andAnswer(() -> {
workerCallbackCapture.getValue().onCompletion(null, msg);
return null;
});
PowerMock.replayAll();
herder.configState = new ClusterConfigState(
10,
null,
Collections.singletonMap(CONNECTOR_NAME, 0),
Collections.singletonMap(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE)),
Collections.singletonMap(CONNECTOR_NAME, TargetState.STOPPED),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>();
herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback);
assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS));
PowerMock.verifyAll();
}
private void expectAdd(SourceSink sourceSink) { private void expectAdd(SourceSink sourceSink) {
Map<String, String> connectorProps = connectorConfig(sourceSink); Map<String, String> connectorProps = connectorConfig(sourceSink);
ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ? ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?

View File

@ -647,7 +647,8 @@ public class EmbeddedConnectCluster {
/** /**
* Get the offsets for a connector via the <strong><em>GET /connectors/{connector}/offsets</em></strong> endpoint * Get the offsets for a connector via the <strong><em>GET /connectors/{connector}/offsets</em></strong> endpoint
* @param connectorName name of the connector *
* @param connectorName name of the connector whose offsets are to be retrieved
* @return the connector's offsets * @return the connector's offsets
*/ */
public ConnectorOffsets connectorOffsets(String connectorName) { public ConnectorOffsets connectorOffsets(String connectorName) {
@ -668,7 +669,8 @@ public class EmbeddedConnectCluster {
/** /**
* Alter a connector's offsets via the <strong><em>PATCH /connectors/{connector}/offsets</em></strong> endpoint * Alter a connector's offsets via the <strong><em>PATCH /connectors/{connector}/offsets</em></strong> endpoint
* @param connectorName name of the connector *
* @param connectorName name of the connector whose offsets are to be altered
* @param offsets offsets to alter * @param offsets offsets to alter
*/ */
public String alterConnectorOffsets(String connectorName, ConnectorOffsets offsets) { public String alterConnectorOffsets(String connectorName, ConnectorOffsets offsets) {
@ -686,7 +688,23 @@ public class EmbeddedConnectCluster {
return responseToString(response); return responseToString(response);
} else { } else {
throw new ConnectRestException(response.getStatus(), throw new ConnectRestException(response.getStatus(),
"Could not execute PATCH request. Error response: " + responseToString(response)); "Could not alter connector offsets. Error response: " + responseToString(response));
}
}
/**
* Reset a connector's offsets via the <strong><em>DELETE /connectors/{connector}/offsets</em></strong> endpoint
*
* @param connectorName name of the connector whose offsets are to be reset
*/
public String resetConnectorOffsets(String connectorName) {
String url = endpointForResource(String.format("connectors/%s/offsets", connectorName));
Response response = requestDelete(url);
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
return responseToString(response);
} else {
throw new ConnectRestException(response.getStatus(),
"Could not reset connector offsets. Error response: " + responseToString(response));
} }
} }