diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3655ea01ba3..e3975a30c8c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -413,6 +413,7 @@
+
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index d1d1eecc530..c12246f139d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -61,6 +61,7 @@ import java.util.Map;
import java.util.regex.Pattern;
import static java.util.Collections.singleton;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
/**
* WorkerTask that uses a SinkTask to export data from Kafka.
@@ -77,6 +78,7 @@ class WorkerSinkTask extends WorkerTask {
private final HeaderConverter headerConverter;
private final TransformationChain transformationChain;
private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
+ private final boolean isTopicTrackingEnabled;
private KafkaConsumer consumer;
private WorkerSinkTaskContext context;
private final List messageBatch;
@@ -131,6 +133,7 @@ class WorkerSinkTask extends WorkerTask {
this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics);
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
+ this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
}
@Override
@@ -505,7 +508,9 @@ class WorkerSinkTask extends WorkerTask {
headers);
log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
- recordActiveTopic(origRecord.topic());
+ if (isTopicTrackingEnabled) {
+ recordActiveTopic(origRecord.topic());
+ }
return transformationChain.apply(origRecord);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9e0c45f9155..da2f9ef7c92 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -60,6 +60,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
+
/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
*/
@@ -80,6 +82,7 @@ class WorkerSourceTask extends WorkerTask {
private final OffsetStorageWriter offsetWriter;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
private final AtomicReference producerSendException;
+ private final boolean isTopicTrackingEnabled;
private List toSend;
private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
@@ -137,6 +140,7 @@ class WorkerSourceTask extends WorkerTask {
this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
this.producerSendException = new AtomicReference<>();
+ this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
}
@Override
@@ -356,7 +360,9 @@ class WorkerSourceTask extends WorkerTask {
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitTaskRecord(preTransformRecord, recordMetadata);
- recordActiveTopic(producerRecord.topic());
+ if (isTopicTrackingEnabled) {
+ recordActiveTopic(producerRecord.topic());
+ }
}
}
});
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index bfed9ecfd34..512c700d365 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -90,6 +90,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
@@ -150,6 +151,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private final ExecutorService startAndStopExecutor;
private final WorkerGroupMember member;
private final AtomicBoolean stopping;
+ private final boolean isTopicTrackingEnabled;
// Track enough information about the current membership state to be able to determine which requests via the API
// and the from other nodes are safe to process
@@ -158,7 +160,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private Set tasksToRestart = new HashSet<>();
private ExtendedAssignment assignment;
private boolean canReadConfigs;
- private ClusterConfigState configState;
+ // visible for testing
+ protected ClusterConfigState configState;
// To handle most external requests, like creating or destroying a connector, we can use a generic request where
// the caller specifies all the code that should be executed.
@@ -216,6 +219,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.keyRotationIntervalMs = config.getInt(DistributedConfig.INTER_WORKER_KEY_TTL_MS_CONFIG);
this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
this.keyGenerator = config.getInternalRequestKeyGenerator();
+ this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
@@ -1619,6 +1623,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
startAndStop(callables);
log.info("Finished stopping tasks in preparation for rebalance");
+ if (isTopicTrackingEnabled) {
+ // Send tombstones to reset active topics for removed connectors only after
+ // connectors and tasks have been stopped, or these tombstones will be overwritten
+ resetActiveTopics(connectors, tasks);
+ }
+
// Ensure that all status updates have been pushed to the storage system before rebalancing.
// Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
// completes.
@@ -1628,6 +1638,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.info("Wasn't able to resume work after last rebalance, can skip stopping connectors and tasks");
}
}
+
+ private void resetActiveTopics(Collection connectors, Collection tasks) {
+ connectors.stream()
+ .filter(connectorName -> !configState.contains(connectorName))
+ .forEach(DistributedHerder.this::resetConnectorActiveTopics);
+ tasks.stream()
+ .map(ConnectorTaskId::connector)
+ .distinct()
+ .filter(connectorName -> !configState.contains(connectorName))
+ .forEach(DistributedHerder.this::resetConnectorActiveTopics);
+ }
}
class HerderMetrics {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 6d30eabd2ee..ffacf3a82f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -87,10 +87,14 @@ public class ConnectorsResource {
private final WorkerConfig config;
@javax.ws.rs.core.Context
private ServletContext context;
+ private final boolean isTopicTrackingDisabled;
+ private final boolean isTopicTrackingResetDisabled;
public ConnectorsResource(Herder herder, WorkerConfig config) {
this.herder = herder;
this.config = config;
+ isTopicTrackingDisabled = !config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+ isTopicTrackingResetDisabled = !config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG);
}
@GET
@@ -98,7 +102,7 @@ public class ConnectorsResource {
public Response listConnectors(
final @Context UriInfo uriInfo,
final @Context HttpHeaders headers
- ) throws Throwable {
+ ) {
if (uriInfo.getQueryParameters().containsKey("expand")) {
Map> out = new HashMap<>();
for (String connector : herder.connectors()) {
@@ -113,7 +117,7 @@ public class ConnectorsResource {
connectorExpansions.put("info", herder.connectorInfo(connector));
break;
default:
- log.info("Ignoring unknown expanion type {}", expansion);
+ log.info("Ignoring unknown expansion type {}", expansion);
}
}
out.put(connector, connectorExpansions);
@@ -174,17 +178,16 @@ public class ConnectorsResource {
@GET
@Path("/{connector}/status")
- public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
+ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) {
return herder.connectorStatus(connector);
}
@GET
@Path("/{connector}/topics")
- public Response getConnectorActiveTopics(final @PathParam("connector") String connector) throws Throwable {
- if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) {
- return Response.status(Response.Status.FORBIDDEN)
- .entity("Topic tracking is disabled.")
- .build();
+ public Response getConnectorActiveTopics(final @PathParam("connector") String connector) {
+ if (isTopicTrackingDisabled) {
+ throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
+ "Topic tracking is disabled.");
}
ActiveTopicsInfo info = herder.connectorActiveTopics(connector);
return Response.ok(Collections.singletonMap(info.connector(), info)).build();
@@ -192,16 +195,14 @@ public class ConnectorsResource {
@PUT
@Path("/{connector}/topics/reset")
- public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) throws Throwable {
- if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) {
- return Response.status(Response.Status.FORBIDDEN)
- .entity("Topic tracking is disabled.")
- .build();
+ public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) {
+ if (isTopicTrackingDisabled) {
+ throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
+ "Topic tracking is disabled.");
}
- if (!config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)) {
- return Response.status(Response.Status.FORBIDDEN)
- .entity("Topic tracking reset is disabled.")
- .build();
+ if (isTopicTrackingResetDisabled) {
+ throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
+ "Topic tracking reset is disabled.");
}
herder.resetConnectorActiveTopics(connector);
return Response.accepted().build();
@@ -280,7 +281,7 @@ public class ConnectorsResource {
@Path("/{connector}/tasks/{task}/status")
public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
- final @PathParam("task") Integer task) throws Throwable {
+ final @PathParam("task") Integer task) {
return herder.taskStatus(new ConnectorTaskId(connector, task));
}
@@ -306,8 +307,8 @@ public class ConnectorsResource {
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
}
- // Check whether the connector name from the url matches the one (if there is one) provided in the connectorconfig
- // object. Throw BadRequestException on mismatch, otherwise put connectorname in config
+ // Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig
+ // object. Throw BadRequestException on mismatch, otherwise put connectorName in config
private void checkAndPutConnectorConfigName(String connectorName, Map connectorConfig) {
String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG);
if (includedName != null) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 6b1535081a3..c8eace72934 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -85,10 +85,10 @@ import java.util.concurrent.ConcurrentMap;
public class KafkaStatusBackingStore implements StatusBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
- private static final String TASK_STATUS_PREFIX = "status-task-";
- private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
- private static final String TOPIC_STATUS_PREFIX = "status-topic-";
- private static final String TOPIC_STATUS_SEPARATOR = ":connector-";
+ public static final String TASK_STATUS_PREFIX = "status-task-";
+ public static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
+ public static final String TOPIC_STATUS_PREFIX = "status-topic-";
+ public static final String TOPIC_STATUS_SEPARATOR = ":connector-";
public static final String STATE_KEY_NAME = "state";
public static final String TRACE_KEY_NAME = "trace";
@@ -122,9 +122,9 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
private final Time time;
private final Converter converter;
- private final Table> tasks;
- private final Map> connectors;
//visible for testing
+ protected final Table> tasks;
+ protected final Map> connectors;
protected final ConcurrentMap> topics;
private String statusTopic;
@@ -435,7 +435,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
}
}
- private TopicStatus parseTopicStatus(byte[] data) {
+ protected TopicStatus parseTopicStatus(byte[] data) {
try {
SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data);
if (!(schemaAndValue.value() instanceof Map)) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
index bf7ca157612..1b7131a5144 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
@@ -62,4 +62,7 @@ public class Table {
return Collections.unmodifiableMap(columns);
}
+ public boolean isEmpty() {
+ return table.isEmpty();
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
new file mode 100644
index 00000000000..acfc8967058
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for the endpoints that offer topic tracking of a connector's active
+ * topics.
+ */
+@Category(IntegrationTest.class)
+public class ConnectorTopicsIntegrationTest {
+
+ private static final int NUM_WORKERS = 5;
+ private static final int NUM_TASKS = 1;
+ private static final String FOO_TOPIC = "foo-topic";
+ private static final String FOO_CONNECTOR = "foo-source";
+ private static final String BAR_TOPIC = "bar-topic";
+ private static final String BAR_CONNECTOR = "bar-source";
+ private static final String SINK_CONNECTOR = "baz-sink";
+ private static final int NUM_TOPIC_PARTITIONS = 3;
+
+ private EmbeddedConnectCluster.Builder connectBuilder;
+ private EmbeddedConnectCluster connect;
+ Map workerProps = new HashMap<>();
+ Properties brokerProps = new Properties();
+
+ @Before
+ public void setup() {
+ // setup Connect worker properties
+ workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
+
+ // setup Kafka broker properties
+ brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+ // build a Connect cluster backed by Kafka and Zk
+ connectBuilder = new EmbeddedConnectCluster.Builder()
+ .name("connect-cluster")
+ .numWorkers(NUM_WORKERS)
+ .workerProps(workerProps)
+ .brokerProps(brokerProps)
+ .maskExitProcedures(true); // true is the default, setting here as example
+ }
+
+ @After
+ public void close() {
+ // stop all Connect, Kafka and Zk threads.
+ connect.stop();
+ }
+
+ @Test
+ public void testGetActiveTopics() throws InterruptedException {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ // create test topic
+ connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
+ connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
+
+ connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
+ "Active topic set is not empty for connector: " + FOO_CONNECTOR);
+
+ // start a source connector
+ connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.singletonList(FOO_TOPIC),
+ "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + FOO_CONNECTOR);
+
+ // start another source connector
+ connect.configureConnector(BAR_CONNECTOR, defaultSourceConnectorProps(BAR_TOPIC));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(BAR_CONNECTOR, NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.singletonList(BAR_TOPIC),
+ "Active topic set is not: " + Collections.singletonList(BAR_TOPIC) + " for connector: " + BAR_CONNECTOR);
+
+ // start a sink connector
+ connect.configureConnector(SINK_CONNECTOR, defaultSinkConnectorProps(FOO_TOPIC, BAR_TOPIC));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC, BAR_TOPIC),
+ "Active topic set is not: " + Arrays.asList(FOO_TOPIC, BAR_TOPIC) + " for connector: " + SINK_CONNECTOR);
+
+ // deleting a connector resets its active topics
+ connect.deleteConnector(BAR_CONNECTOR);
+
+ connect.assertions().assertConnectorAndTasksAreStopped(BAR_CONNECTOR,
+ "Connector tasks did not stop in time.");
+
+ connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(),
+ "Active topic set is not empty for deleted connector: " + BAR_CONNECTOR);
+
+ // Unfortunately there's currently no easy way to know when the consumer caught up with
+ // the last records that the producer of the stopped connector managed to produce.
+ // Repeated runs show that this amount of time is sufficient for the consumer to catch up.
+ Thread.sleep(5000);
+
+ // reset active topics for the sink connector after one of the topics has become idle
+ connect.resetConnectorTopics(SINK_CONNECTOR);
+
+ connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC),
+ "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
+ }
+
+ @Test
+ public void testTopicTrackingResetIsDisabled() throws InterruptedException {
+ workerProps.put(TOPIC_TRACKING_ALLOW_RESET_CONFIG, "false");
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ // create test topic
+ connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
+ connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
+
+ connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
+ "Active topic set is not empty for connector: " + FOO_CONNECTOR);
+
+ // start a source connector
+ connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.singletonList(FOO_TOPIC),
+ "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + FOO_CONNECTOR);
+
+ // start a sink connector
+ connect.configureConnector(SINK_CONNECTOR, defaultSinkConnectorProps(FOO_TOPIC));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC),
+ "Active topic set is not: " + Arrays.asList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
+
+ // deleting a connector resets its active topics
+ connect.deleteConnector(FOO_CONNECTOR);
+
+ connect.assertions().assertConnectorAndTasksAreStopped(FOO_CONNECTOR,
+ "Connector tasks did not stop in time.");
+
+ connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
+ "Active topic set is not empty for deleted connector: " + FOO_CONNECTOR);
+
+ // Unfortunately there's currently no easy way to know when the consumer caught up with
+ // the last records that the producer of the stopped connector managed to produce.
+ // Repeated runs show that this amount of time is sufficient for the consumer to catch up.
+ Thread.sleep(5000);
+
+ // resetting active topics for the sink connector won't work when the config is disabled
+ Exception e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorTopics(SINK_CONNECTOR));
+ assertTrue(e.getMessage().contains("Topic tracking reset is disabled."));
+
+ connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC),
+ "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
+ }
+
+ @Test
+ public void testTopicTrackingIsDisabled() throws InterruptedException {
+ workerProps.put(TOPIC_TRACKING_ENABLE_CONFIG, "false");
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ // create test topic
+ connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
+ connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
+
+ // start a source connector
+ connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ // resetting active topics for the sink connector won't work when the config is disabled
+ Exception e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorTopics(SINK_CONNECTOR));
+ assertTrue(e.getMessage().contains("Topic tracking is disabled."));
+
+ e = assertThrows(ConnectRestException.class, () -> connect.connectorTopics(SINK_CONNECTOR));
+ assertTrue(e.getMessage().contains("Topic tracking is disabled."));
+
+ // Wait for tasks to produce a few records
+ Thread.sleep(5000);
+
+ assertNoTopicStatusInStatusTopic();
+ }
+
+ public void assertNoTopicStatusInStatusTopic() {
+ String statusTopic = workerProps.get(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
+ Consumer verifiableConsumer = connect.kafka().createConsumer(
+ Collections.singletonMap("group.id", "verifiable-consumer-group-0"));
+
+ List partitions =
+ Optional.ofNullable(verifiableConsumer.partitionsFor(statusTopic))
+ .orElseThrow(() -> new AssertionError("Unable to retrieve partitions info for status topic"))
+ .stream()
+ .map(info -> new TopicPartition(info.topic(), info.partition()))
+ .collect(Collectors.toList());
+ verifiableConsumer.assign(partitions);
+
+ // Based on the implementation of {@link org.apache.kafka.connect.util.KafkaBasedLog#readToLogEnd}
+ Set assignment = verifiableConsumer.assignment();
+ verifiableConsumer.seekToBeginning(assignment);
+ Map endOffsets = verifiableConsumer.endOffsets(assignment);
+ while (!endOffsets.isEmpty()) {
+ Iterator> it = endOffsets.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry entry = it.next();
+ if (verifiableConsumer.position(entry.getKey()) >= entry.getValue())
+ it.remove();
+ else {
+ try {
+ StreamSupport.stream(verifiableConsumer.poll(Duration.ofMillis(Integer.MAX_VALUE)).spliterator(), false)
+ .map(ConsumerRecord::key)
+ .filter(Objects::nonNull)
+ .filter(key -> new String(key, StandardCharsets.UTF_8).startsWith(KafkaStatusBackingStore.TOPIC_STATUS_PREFIX))
+ .findFirst()
+ .ifPresent(key -> {
+ throw new AssertionError("Found unexpected key: " + new String(key, StandardCharsets.UTF_8) + " in status topic");
+ });
+ } catch (KafkaException e) {
+ throw new AssertionError("Error while reading to the end of status topic", e);
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ private Map defaultSourceConnectorProps(String topic) {
+ // setup up props for the source connector
+ Map props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+ props.put(TOPIC_CONFIG, topic);
+ props.put("throughput", String.valueOf(10));
+ props.put("messages.per.poll", String.valueOf(10));
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ return props;
+ }
+
+ private Map defaultSinkConnectorProps(String... topics) {
+ // setup up props for the sink connector
+ Map props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+ props.put(TOPICS_CONFIG, String.join(",", topics));
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ return props;
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
index ed6f6b96f98..05b2dfdd886 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
@@ -77,6 +77,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
@Override
public void stop() {
+ log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName);
connectorHandle.recordConnectorStop();
}
@@ -153,6 +154,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
@Override
public void stop() {
+ log.info("Stopped {} task {}", this.getClass().getSimpleName(), taskId);
taskHandle.recordTaskStop();
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 74693f890e2..08118fea29b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
@@ -74,6 +75,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -118,21 +120,24 @@ public class DistributedHerderTest {
private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
private static final Integer MAX_TASKS = 3;
private static final Map CONN1_CONFIG = new HashMap<>();
+ private static final String FOO_TOPIC = "foo";
+ private static final String BAR_TOPIC = "bar";
+ private static final String BAZ_TOPIC = "baz";
static {
CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
- CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", FOO_TOPIC, BAR_TOPIC));
CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);
static {
- CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
+ CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", FOO_TOPIC, BAR_TOPIC, BAZ_TOPIC));
}
private static final Map CONN2_CONFIG = new HashMap<>();
static {
CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
- CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", FOO_TOPIC, BAR_TOPIC));
CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map TASK_CONFIG = new HashMap<>();
@@ -361,7 +366,6 @@ public class DistributedHerderTest {
member.requestRejoin();
PowerMock.expectLastCall();
- expectPostRebalanceCatchup(SNAPSHOT);
// In the second rebalance the new member gets its assignment and this member has no
// assignments or revocations
@@ -372,6 +376,7 @@ public class DistributedHerderTest {
PowerMock.replayAll();
+ herder.configState = SNAPSHOT;
time.sleep(1000L);
assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
herder.tick();
@@ -771,15 +776,37 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
- // No immediate action besides this -- change will be picked up via the config log
+ // The change eventually is reflected to the config topic and the deleted connector and
+ // tasks are revoked
+ member.wakeup();
+ PowerMock.expectLastCall();
+ TopicStatus fooStatus = new TopicStatus(FOO_TOPIC, CONN1, 0, time.milliseconds());
+ TopicStatus barStatus = new TopicStatus(BAR_TOPIC, CONN1, 0, time.milliseconds());
+ EasyMock.expect(statusBackingStore.getAllTopics(EasyMock.eq(CONN1))).andReturn(new HashSet<>(Arrays.asList(fooStatus, barStatus))).times(2);
+ statusBackingStore.deleteTopic(EasyMock.eq(CONN1), EasyMock.eq(FOO_TOPIC));
+ PowerMock.expectLastCall().times(2);
+ statusBackingStore.deleteTopic(EasyMock.eq(CONN1), EasyMock.eq(BAR_TOPIC));
+ PowerMock.expectLastCall().times(2);
+ expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1),
+ ConnectProtocol.Assignment.NO_ERROR, 2,
+ Collections.emptyList(), Collections.emptyList(), 0);
+ expectPostRebalanceCatchup(ClusterConfigState.EMPTY);
+ member.requestRejoin();
+ PowerMock.expectLastCall();
PowerMock.replayAll();
herder.deleteConnectorConfig(CONN1, putConnectorCallback);
herder.tick();
time.sleep(1000L);
- assertStatistics(3, 1, 100, 1000L);
+ assertStatistics("leaderUrl", false, 3, 1, 100, 1000L);
+
+ configUpdateListener.onConnectorConfigRemove(CONN1); // read updated config that removes the connector
+ herder.configState = ClusterConfigState.EMPTY;
+ herder.tick();
+ time.sleep(1000L);
+ assertStatistics("leaderUrl", true, 3, 1, 100, 2100L);
PowerMock.verifyAll();
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 967b8fca07b..4a00b083de3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -31,11 +31,13 @@ import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
@@ -54,6 +56,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;
@@ -67,8 +70,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
@RunWith(PowerMockRunner.class)
@PrepareForTest(RestClient.class)
@@ -130,16 +137,26 @@ public class ConnectorsResourceTest {
TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
}
+ private static final Set CONNECTOR_ACTIVE_TOPICS = new HashSet<>(
+ Arrays.asList("foo_topic", "bar_topic"));
+ private static final Set CONNECTOR2_ACTIVE_TOPICS = new HashSet<>(
+ Arrays.asList("foo_topic", "baz_topic"));
+
@Mock
private Herder herder;
private ConnectorsResource connectorsResource;
private UriInfo forward;
+ @Mock
+ private WorkerConfig workerConfig;
@Before
public void setUp() throws NoSuchMethodException {
PowerMock.mockStatic(RestClient.class,
RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class));
- connectorsResource = new ConnectorsResource(herder, null);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
+ PowerMock.replay(workerConfig);
+ connectorsResource = new ConnectorsResource(herder, workerConfig);
forward = EasyMock.mock(UriInfo.class);
MultivaluedMap queryParams = new MultivaluedHashMap<>();
queryParams.putSingle("forward", "true");
@@ -434,8 +451,8 @@ public class ConnectorsResourceTest {
herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
- EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, null))
- .andReturn(new RestClient.HttpResponse<>(204, new HashMap(), null));
+ EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, workerConfig))
+ .andReturn(new RestClient.HttpResponse<>(204, new HashMap<>(), null));
PowerMock.replayAll();
@@ -803,6 +820,90 @@ public class ConnectorsResourceTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
+ PowerMock.reset(workerConfig);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false);
+ PowerMock.replay(workerConfig);
+ connectorsResource = new ConnectorsResource(herder, workerConfig);
+ PowerMock.replayAll();
+
+ Exception e = assertThrows(ConnectRestException.class,
+ () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
+ assertEquals("Topic tracking is disabled.", e.getMessage());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() {
+ PowerMock.reset(workerConfig);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
+ HttpHeaders headers = EasyMock.mock(HttpHeaders.class);
+ PowerMock.replay(workerConfig);
+ connectorsResource = new ConnectorsResource(herder, workerConfig);
+ PowerMock.replayAll();
+
+ Exception e = assertThrows(ConnectRestException.class,
+ () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
+ assertEquals("Topic tracking is disabled.", e.getMessage());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() {
+ PowerMock.reset(workerConfig);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false);
+ HttpHeaders headers = EasyMock.mock(HttpHeaders.class);
+ PowerMock.replay(workerConfig);
+ connectorsResource = new ConnectorsResource(herder, workerConfig);
+ PowerMock.replayAll();
+
+ Exception e = assertThrows(ConnectRestException.class,
+ () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
+ assertEquals("Topic tracking reset is disabled.", e.getMessage());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testConnectorActiveTopics() {
+ PowerMock.reset(workerConfig);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
+ EasyMock.expect(herder.connectorActiveTopics(CONNECTOR_NAME))
+ .andReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS));
+ PowerMock.replay(workerConfig);
+ connectorsResource = new ConnectorsResource(herder, workerConfig);
+ PowerMock.replayAll();
+
+ Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ Map> body = (Map>) response.getEntity();
+ assertEquals(CONNECTOR_NAME, ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).connector());
+ assertEquals(new HashSet<>(CONNECTOR_ACTIVE_TOPICS),
+ ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).topics());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testResetConnectorActiveTopics() {
+ PowerMock.reset(workerConfig);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
+ EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
+ HttpHeaders headers = EasyMock.mock(HttpHeaders.class);
+ herder.resetConnectorActiveTopics(CONNECTOR_NAME);
+ EasyMock.expectLastCall();
+ PowerMock.replay(workerConfig);
+ connectorsResource = new ConnectorsResource(herder, workerConfig);
+ PowerMock.replayAll();
+
+ Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
+ assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
+ PowerMock.verifyAll();
+ }
+
private byte[] serializeAsBytes(final T value) throws IOException {
return new ObjectMapper().writeValueAsBytes(value);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
index aac7cb0c3b9..460fb0d6c38 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
@@ -22,7 +22,9 @@ import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
+import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.io.IOException;
@@ -35,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@RunWith(PowerMockRunner.class)
public class FileOffsetBackingStoreTest {
FileOffsetBackingStore store;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java
index 6bb4a1db702..53f481ac9a2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java
@@ -17,46 +17,151 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.easymock.Capture;
import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG;
+import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.CONNECTOR_STATUS_PREFIX;
+import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TASK_STATUS_PREFIX;
+import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TOPIC_STATUS_PREFIX;
+import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TOPIC_STATUS_SEPARATOR;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@RunWith(PowerMockRunner.class)
public class KafkaStatusBackingStoreFormatTest extends EasyMockSupport {
private static final String STATUS_TOPIC = "status-topic";
+ private static final String FOO_TOPIC = "foo-topic";
+ private static final String FOO_CONNECTOR = "foo-source";
+ private static final String BAR_TOPIC = "bar-topic";
- private JsonConverter converter;
+ private Time time;
private KafkaStatusBackingStore store;
+ private JsonConverter converter;
+ @Mock
+ private KafkaBasedLog kafkaBasedLog;
@Before
public void setup() {
+ time = new MockTime();
converter = new JsonConverter();
converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false);
- store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, null);
+ store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+ }
+
+ @Test
+ public void readInvalidStatus() {
+ String key = "status-unknown";
+ byte[] value = new byte[0];
+ ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.connectors().isEmpty());
+ assertTrue(store.tasks.isEmpty());
+ assertTrue(store.topics.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.connectors().isEmpty());
+ assertTrue(store.tasks.isEmpty());
+ assertTrue(store.topics.isEmpty());
+
+ key = CONNECTOR_STATUS_PREFIX;
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.connectors().isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.connectors().isEmpty());
+
+ key = TASK_STATUS_PREFIX;
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.tasks.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.tasks.isEmpty());
+
+ key = TASK_STATUS_PREFIX + FOO_CONNECTOR + "-#";
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.tasks.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.tasks.isEmpty());
+
+ key = TOPIC_STATUS_PREFIX;
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.topics.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.topics.isEmpty());
+
+ key = TOPIC_STATUS_PREFIX + TOPIC_STATUS_SEPARATOR;
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.topics.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.topics.isEmpty());
+
+ key = TOPIC_STATUS_PREFIX + FOO_TOPIC + ":";
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.topics.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.topics.isEmpty());
+
+ key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR;
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.topics.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.topics.isEmpty());
+ }
+
+ @Test
+ public void readInvalidStatusValue() {
+ String key = CONNECTOR_STATUS_PREFIX + FOO_CONNECTOR;
+ byte[] value = "invalid".getBytes();
+ ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.connectors().isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.connectors().isEmpty());
+
+ key = TASK_STATUS_PREFIX + FOO_CONNECTOR + "-0";
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.tasks.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.tasks.isEmpty());
+
+ key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
+ statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
+ assertTrue(store.topics.isEmpty());
+ store.read(statusRecord);
+ assertTrue(store.topics.isEmpty());
}
@Test
public void readTopicStatus() {
- TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds());
+ TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), Time.SYSTEM.milliseconds());
+ String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
byte[] value = store.serializeTopicStatus(topicStatus);
- ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", value);
+ ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
store.read(statusRecord);
- assertTrue(store.topics.containsKey("bar"));
- assertTrue(store.topics.get("bar").containsKey("foo"));
- assertEquals(topicStatus, store.topics.get("bar").get("foo"));
+ assertTrue(store.topics.containsKey(FOO_CONNECTOR));
+ assertTrue(store.topics.get(FOO_CONNECTOR).containsKey(FOO_TOPIC));
+ assertEquals(topicStatus, store.topics.get(FOO_CONNECTOR).get(FOO_TOPIC));
}
@Test
@@ -66,10 +171,132 @@ public class KafkaStatusBackingStoreFormatTest extends EasyMockSupport {
assertTrue(store.topics.containsKey("bar"));
assertTrue(store.topics.get("bar").containsKey("foo"));
assertEquals(topicStatus, store.topics.get("bar").get("foo"));
- ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", null);
+ // should return null
+ byte[] value = store.serializeTopicStatus(null);
+ ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", value);
store.read(statusRecord);
assertTrue(store.topics.containsKey("bar"));
assertFalse(store.topics.get("bar").containsKey("foo"));
assertEquals(Collections.emptyMap(), store.topics.get("bar"));
}
+
+ @Test
+ public void putTopicState() {
+ TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
+ String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
+ Capture valueCapture = newCapture();
+ Capture callbackCapture = newCapture();
+ kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(() -> {
+ callbackCapture.getValue().onCompletion(null, null);
+ return null;
+ });
+ replayAll();
+
+ store.put(topicStatus);
+ // check capture state
+ assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
+ // state is not visible until read back from the log
+ assertEquals(null, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
+
+ ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, valueCapture.getValue());
+ store.read(statusRecord);
+ assertEquals(topicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
+ assertEquals(new HashSet<>(Collections.singletonList(topicStatus)), new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putTopicStateRetriableFailure() {
+ TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
+ String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
+ Capture valueCapture = newCapture();
+ Capture callbackCapture = newCapture();
+ kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(() -> {
+ callbackCapture.getValue().onCompletion(null, new TimeoutException());
+ return null;
+ })
+ .andAnswer(() -> {
+ callbackCapture.getValue().onCompletion(null, null);
+ return null;
+ });
+
+ replayAll();
+ store.put(topicStatus);
+
+ // check capture state
+ assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
+ // state is not visible until read back from the log
+ assertEquals(null, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putTopicStateNonRetriableFailure() {
+ TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
+ String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
+ Capture valueCapture = newCapture();
+ Capture callbackCapture = newCapture();
+ kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(() -> {
+ callbackCapture.getValue().onCompletion(null, new UnknownServerException());
+ return null;
+ });
+
+ replayAll();
+
+ // the error is logged and ignored
+ store.put(topicStatus);
+
+ // check capture state
+ assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
+ // state is not visible until read back from the log
+ assertEquals(null, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putTopicStateShouldOverridePreviousState() {
+ TopicStatus firstTopicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR,
+ 0), time.milliseconds());
+ time.sleep(1000);
+ TopicStatus secondTopicStatus = new TopicStatus(BAR_TOPIC, new ConnectorTaskId(FOO_CONNECTOR,
+ 0), time.milliseconds());
+ String firstKey = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
+ String secondKey = TOPIC_STATUS_PREFIX + BAR_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
+ Capture valueCapture = newCapture();
+ Capture callbackCapture = newCapture();
+ kafkaBasedLog.send(eq(secondKey), capture(valueCapture), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(() -> {
+ callbackCapture.getValue().onCompletion(null, null);
+ // The second status record is read soon after it's persisted in the status topic
+ ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, secondKey, valueCapture.getValue());
+ store.read(statusRecord);
+ return null;
+ });
+ replayAll();
+
+ byte[] value = store.serializeTopicStatus(firstTopicStatus);
+ ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, firstKey, value);
+ store.read(statusRecord);
+ store.put(secondTopicStatus);
+
+ // check capture state
+ assertEquals(secondTopicStatus, store.parseTopicStatus(valueCapture.getValue()));
+ assertEquals(firstTopicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
+ assertEquals(secondTopicStatus, store.getTopic(FOO_CONNECTOR, BAR_TOPIC));
+ assertEquals(new HashSet<>(Arrays.asList(firstTopicStatus, secondTopicStatus)),
+ new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
+
+ verifyAll();
+ }
+
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
index e2f5a406c19..bd8e7a512a3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.record.TimestampType;
@@ -27,15 +28,23 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
+import org.easymock.Mock;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import static org.easymock.EasyMock.anyObject;
@@ -45,8 +54,11 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
@SuppressWarnings("unchecked")
+@RunWith(PowerMockRunner.class)
public class KafkaStatusBackingStoreTest extends EasyMockSupport {
private static final String STATUS_TOPIC = "status-topic";
@@ -54,12 +66,34 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
private static final String CONNECTOR = "conn";
private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);
+ private KafkaStatusBackingStore store;
+ @Mock
+ Converter converter;
+ @Mock
+ private KafkaBasedLog kafkaBasedLog;
+ @Mock
+ WorkerConfig workerConfig;
+
+ @Before
+ public void setup() {
+ store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+ }
+
+ @Test
+ public void misconfigurationOfStatusBackingStore() {
+ expect(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).andReturn(null);
+ expect(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).andReturn(" ");
+ replayAll();
+
+ Exception e = assertThrows(ConfigException.class, () -> store.configure(workerConfig));
+ assertEquals("Must specify topic for connector status.", e.getMessage());
+ e = assertThrows(ConfigException.class, () -> store.configure(workerConfig));
+ assertEquals("Must specify topic for connector status.", e.getMessage());
+ verifyAll();
+ }
+
@Test
public void putConnectorState() {
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@@ -87,10 +121,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putConnectorStateRetriableFailure() {
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@@ -125,10 +155,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putConnectorStateNonRetriableFailure() {
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@@ -160,10 +186,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
byte[] value = new byte[0];
String otherWorkerId = "anotherhost:8083";
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
// the persisted came from a different host and has a newer generation
Map statusMap = new HashMap<>();
statusMap.put("worker_id", otherWorkerId);
@@ -188,10 +210,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putSafeWithNoPreviousValueIsPropagated() {
- final Converter converter = mock(Converter.class);
- final KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
final byte[] value = new byte[0];
final Capture statusValueStruct = newCapture();
@@ -217,10 +235,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
public void putSafeOverridesValueSetBySameWorker() {
final byte[] value = new byte[0];
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
// the persisted came from the same host, but has a newer generation
Map firstStatusRead = new HashMap<>();
firstStatusRead.put("worker_id", WORKER_ID);
@@ -267,10 +281,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
final byte[] value = new byte[0];
String otherWorkerId = "anotherhost:8083";
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
// the persisted came from a different host and has a newer generation
Map firstStatusRead = new HashMap<>();
firstStatusRead.put("worker_id", otherWorkerId);
@@ -315,10 +325,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
public void readConnectorState() {
byte[] value = new byte[0];
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
Map statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
@@ -339,10 +345,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putTaskState() {
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@@ -372,10 +374,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
public void readTaskState() {
byte[] value = new byte[0];
- KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class);
- Converter converter = mock(Converter.class);
- KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
-
Map statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
@@ -394,6 +392,69 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
verifyAll();
}
+ @Test
+ public void deleteConnectorState() {
+ final byte[] value = new byte[0];
+ Map statusMap = new HashMap<>();
+ statusMap.put("worker_id", WORKER_ID);
+ statusMap.put("state", "RUNNING");
+ statusMap.put("generation", 0L);
+
+ converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class));
+ EasyMock.expectLastCall().andReturn(value);
+ kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), anyObject(Callback.class));
+ expectLastCall();
+
+ converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class));
+ EasyMock.expectLastCall().andReturn(value);
+ kafkaBasedLog.send(eq("status-task-conn-0"), eq(value), anyObject(Callback.class));
+ expectLastCall();
+
+ expect(converter.toConnectData(STATUS_TOPIC, value)).andReturn(new SchemaAndValue(null, statusMap));
+
+ replayAll();
+
+ ConnectorStatus connectorStatus = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+ store.put(connectorStatus);
+ TaskStatus taskStatus = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+ store.put(taskStatus);
+ store.read(consumerRecord(0, "status-task-conn-0", value));
+
+ assertEquals(new HashSet<>(Collections.singletonList(CONNECTOR)), store.connectors());
+ assertEquals(new HashSet<>(Collections.singletonList(taskStatus)), new HashSet<>(store.getAll(CONNECTOR)));
+ store.read(consumerRecord(0, "status-connector-conn", null));
+ assertTrue(store.connectors().isEmpty());
+ assertTrue(store.getAll(CONNECTOR).isEmpty());
+ verifyAll();
+ }
+
+ @Test
+ public void deleteTaskState() {
+ final byte[] value = new byte[0];
+ Map statusMap = new HashMap<>();
+ statusMap.put("worker_id", WORKER_ID);
+ statusMap.put("state", "RUNNING");
+ statusMap.put("generation", 0L);
+
+ converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class));
+ EasyMock.expectLastCall().andReturn(value);
+ kafkaBasedLog.send(eq("status-task-conn-0"), eq(value), anyObject(Callback.class));
+ expectLastCall();
+
+ expect(converter.toConnectData(STATUS_TOPIC, value)).andReturn(new SchemaAndValue(null, statusMap));
+
+ replayAll();
+
+ TaskStatus taskStatus = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+ store.put(taskStatus);
+ store.read(consumerRecord(0, "status-task-conn-0", value));
+
+ assertEquals(new HashSet<>(Collections.singletonList(taskStatus)), new HashSet<>(store.getAll(CONNECTOR)));
+ store.read(consumerRecord(0, "status-task-conn-0", null));
+ assertTrue(store.getAll(CONNECTOR).isEmpty());
+ verifyAll();
+ }
+
private static ConsumerRecord consumerRecord(long offset, String key, byte[] value) {
return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
TimestampType.CREATE_TIME, 0L, 0, 0, key, value);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 961b1d840c6..ee983fba411 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.connect.util.clusters;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -36,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -355,6 +358,52 @@ public class EmbeddedConnectCluster {
"Could not read connector state. Error response: " + responseToString(response));
}
+ /**
+ * Get the active topics of a connector running in this cluster.
+ *
+ * @param connectorName name of the connector
+ * @return an instance of {@link ConnectorStateInfo} populated with state information of the connector and its tasks.
+ * @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code.
+ * @throws ConnectException for any other error.
+ */
+ public ActiveTopicsInfo connectorTopics(String connectorName) {
+ ObjectMapper mapper = new ObjectMapper();
+ String url = endpointForResource(String.format("connectors/%s/topics", connectorName));
+ Response response = requestGet(url);
+ try {
+ if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+ Map>> activeTopics = mapper
+ .readerFor(new TypeReference