KAFKA-9556; Fix two issues with KIP-558 and expand testing coverage (#8085)

Correct the Connect worker logic to properly disable the new topic status (KIP-558) feature when `topic.tracking.enable=false`, and fix automatic topic status reset after a connector is deleted.

Also adds new `ConnectorTopicsIntegrationTest` and expanded unit tests.

Reviewers: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Konstantine Karantasis 2020-02-14 14:34:34 -08:00 committed by GitHub
parent 8d0b069b0f
commit 16ee326755
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 957 additions and 87 deletions

View File

@ -413,6 +413,7 @@
<allow pkg="kafka.utils" />
<allow class="javax.servlet.http.HttpServletResponse" />
<allow class="javax.ws.rs.core.Response" />
<allow pkg="com.fasterxml.jackson.core.type" />
</subpackage>
</subpackage>

View File

@ -61,6 +61,7 @@ import java.util.Map;
import java.util.regex.Pattern;
import static java.util.Collections.singleton;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
/**
* WorkerTask that uses a SinkTask to export data from Kafka.
@ -77,6 +78,7 @@ class WorkerSinkTask extends WorkerTask {
private final HeaderConverter headerConverter;
private final TransformationChain<SinkRecord> transformationChain;
private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
private final boolean isTopicTrackingEnabled;
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
private final List<SinkRecord> messageBatch;
@ -131,6 +133,7 @@ class WorkerSinkTask extends WorkerTask {
this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics);
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
}
@Override
@ -505,7 +508,9 @@ class WorkerSinkTask extends WorkerTask {
headers);
log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
if (isTopicTrackingEnabled) {
recordActiveTopic(origRecord.topic());
}
return transformationChain.apply(origRecord);
}

View File

@ -60,6 +60,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
*/
@ -80,6 +82,7 @@ class WorkerSourceTask extends WorkerTask {
private final OffsetStorageWriter offsetWriter;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
private final AtomicReference<Exception> producerSendException;
private final boolean isTopicTrackingEnabled;
private List<SourceRecord> toSend;
private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
@ -137,6 +140,7 @@ class WorkerSourceTask extends WorkerTask {
this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
this.producerSendException = new AtomicReference<>();
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
}
@Override
@ -356,9 +360,11 @@ class WorkerSourceTask extends WorkerTask {
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitTaskRecord(preTransformRecord, recordMetadata);
if (isTopicTrackingEnabled) {
recordActiveTopic(producerRecord.topic());
}
}
}
});
lastSendFailed = false;
} catch (org.apache.kafka.common.errors.RetriableException e) {

View File

@ -90,6 +90,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
@ -150,6 +151,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private final ExecutorService startAndStopExecutor;
private final WorkerGroupMember member;
private final AtomicBoolean stopping;
private final boolean isTopicTrackingEnabled;
// Track enough information about the current membership state to be able to determine which requests via the API
// and the from other nodes are safe to process
@ -158,7 +160,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private Set<ConnectorTaskId> tasksToRestart = new HashSet<>();
private ExtendedAssignment assignment;
private boolean canReadConfigs;
private ClusterConfigState configState;
// visible for testing
protected ClusterConfigState configState;
// To handle most external requests, like creating or destroying a connector, we can use a generic request where
// the caller specifies all the code that should be executed.
@ -216,6 +219,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.keyRotationIntervalMs = config.getInt(DistributedConfig.INTER_WORKER_KEY_TTL_MS_CONFIG);
this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
this.keyGenerator = config.getInternalRequestKeyGenerator();
this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
@ -1619,6 +1623,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
startAndStop(callables);
log.info("Finished stopping tasks in preparation for rebalance");
if (isTopicTrackingEnabled) {
// Send tombstones to reset active topics for removed connectors only after
// connectors and tasks have been stopped, or these tombstones will be overwritten
resetActiveTopics(connectors, tasks);
}
// Ensure that all status updates have been pushed to the storage system before rebalancing.
// Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
// completes.
@ -1628,6 +1638,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.info("Wasn't able to resume work after last rebalance, can skip stopping connectors and tasks");
}
}
private void resetActiveTopics(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
connectors.stream()
.filter(connectorName -> !configState.contains(connectorName))
.forEach(DistributedHerder.this::resetConnectorActiveTopics);
tasks.stream()
.map(ConnectorTaskId::connector)
.distinct()
.filter(connectorName -> !configState.contains(connectorName))
.forEach(DistributedHerder.this::resetConnectorActiveTopics);
}
}
class HerderMetrics {

View File

@ -87,10 +87,14 @@ public class ConnectorsResource {
private final WorkerConfig config;
@javax.ws.rs.core.Context
private ServletContext context;
private final boolean isTopicTrackingDisabled;
private final boolean isTopicTrackingResetDisabled;
public ConnectorsResource(Herder herder, WorkerConfig config) {
this.herder = herder;
this.config = config;
isTopicTrackingDisabled = !config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
isTopicTrackingResetDisabled = !config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG);
}
@GET
@ -98,7 +102,7 @@ public class ConnectorsResource {
public Response listConnectors(
final @Context UriInfo uriInfo,
final @Context HttpHeaders headers
) throws Throwable {
) {
if (uriInfo.getQueryParameters().containsKey("expand")) {
Map<String, Map<String, Object>> out = new HashMap<>();
for (String connector : herder.connectors()) {
@ -113,7 +117,7 @@ public class ConnectorsResource {
connectorExpansions.put("info", herder.connectorInfo(connector));
break;
default:
log.info("Ignoring unknown expanion type {}", expansion);
log.info("Ignoring unknown expansion type {}", expansion);
}
}
out.put(connector, connectorExpansions);
@ -174,17 +178,16 @@ public class ConnectorsResource {
@GET
@Path("/{connector}/status")
public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) {
return herder.connectorStatus(connector);
}
@GET
@Path("/{connector}/topics")
public Response getConnectorActiveTopics(final @PathParam("connector") String connector) throws Throwable {
if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) {
return Response.status(Response.Status.FORBIDDEN)
.entity("Topic tracking is disabled.")
.build();
public Response getConnectorActiveTopics(final @PathParam("connector") String connector) {
if (isTopicTrackingDisabled) {
throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
"Topic tracking is disabled.");
}
ActiveTopicsInfo info = herder.connectorActiveTopics(connector);
return Response.ok(Collections.singletonMap(info.connector(), info)).build();
@ -192,16 +195,14 @@ public class ConnectorsResource {
@PUT
@Path("/{connector}/topics/reset")
public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) throws Throwable {
if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) {
return Response.status(Response.Status.FORBIDDEN)
.entity("Topic tracking is disabled.")
.build();
public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) {
if (isTopicTrackingDisabled) {
throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
"Topic tracking is disabled.");
}
if (!config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)) {
return Response.status(Response.Status.FORBIDDEN)
.entity("Topic tracking reset is disabled.")
.build();
if (isTopicTrackingResetDisabled) {
throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
"Topic tracking reset is disabled.");
}
herder.resetConnectorActiveTopics(connector);
return Response.accepted().build();
@ -280,7 +281,7 @@ public class ConnectorsResource {
@Path("/{connector}/tasks/{task}/status")
public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @PathParam("task") Integer task) throws Throwable {
final @PathParam("task") Integer task) {
return herder.taskStatus(new ConnectorTaskId(connector, task));
}
@ -306,8 +307,8 @@ public class ConnectorsResource {
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
}
// Check whether the connector name from the url matches the one (if there is one) provided in the connectorconfig
// object. Throw BadRequestException on mismatch, otherwise put connectorname in config
// Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig
// object. Throw BadRequestException on mismatch, otherwise put connectorName in config
private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) {
String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG);
if (includedName != null) {

View File

@ -85,10 +85,10 @@ import java.util.concurrent.ConcurrentMap;
public class KafkaStatusBackingStore implements StatusBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
private static final String TASK_STATUS_PREFIX = "status-task-";
private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
private static final String TOPIC_STATUS_PREFIX = "status-topic-";
private static final String TOPIC_STATUS_SEPARATOR = ":connector-";
public static final String TASK_STATUS_PREFIX = "status-task-";
public static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
public static final String TOPIC_STATUS_PREFIX = "status-topic-";
public static final String TOPIC_STATUS_SEPARATOR = ":connector-";
public static final String STATE_KEY_NAME = "state";
public static final String TRACE_KEY_NAME = "trace";
@ -122,9 +122,9 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
private final Time time;
private final Converter converter;
private final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
private final Map<String, CacheEntry<ConnectorStatus>> connectors;
//visible for testing
protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;
private String statusTopic;
@ -435,7 +435,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
}
}
private TopicStatus parseTopicStatus(byte[] data) {
protected TopicStatus parseTopicStatus(byte[] data) {
try {
SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data);
if (!(schemaAndValue.value() instanceof Map)) {

View File

@ -62,4 +62,7 @@ public class Table<R, C, V> {
return Collections.unmodifiableMap(columns);
}
public boolean isEmpty() {
return table.isEmpty();
}
}

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.integration;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
/**
* Integration test for the endpoints that offer topic tracking of a connector's active
* topics.
*/
@Category(IntegrationTest.class)
public class ConnectorTopicsIntegrationTest {
private static final int NUM_WORKERS = 5;
private static final int NUM_TASKS = 1;
private static final String FOO_TOPIC = "foo-topic";
private static final String FOO_CONNECTOR = "foo-source";
private static final String BAR_TOPIC = "bar-topic";
private static final String BAR_CONNECTOR = "bar-source";
private static final String SINK_CONNECTOR = "baz-sink";
private static final int NUM_TOPIC_PARTITIONS = 3;
private EmbeddedConnectCluster.Builder connectBuilder;
private EmbeddedConnectCluster connect;
Map<String, String> workerProps = new HashMap<>();
Properties brokerProps = new Properties();
@Before
public void setup() {
// setup Connect worker properties
workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
// setup Kafka broker properties
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
// build a Connect cluster backed by Kafka and Zk
connectBuilder = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
.workerProps(workerProps)
.brokerProps(brokerProps)
.maskExitProcedures(true); // true is the default, setting here as example
}
@After
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();
}
@Test
public void testGetActiveTopics() throws InterruptedException {
connect = connectBuilder.build();
// start the clusters
connect.start();
// create test topic
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for connector: " + FOO_CONNECTOR);
// start a source connector
connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
"Connector tasks did not start in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.singletonList(FOO_TOPIC),
"Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + FOO_CONNECTOR);
// start another source connector
connect.configureConnector(BAR_CONNECTOR, defaultSourceConnectorProps(BAR_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(BAR_CONNECTOR, NUM_TASKS,
"Connector tasks did not start in time.");
connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.singletonList(BAR_TOPIC),
"Active topic set is not: " + Collections.singletonList(BAR_TOPIC) + " for connector: " + BAR_CONNECTOR);
// start a sink connector
connect.configureConnector(SINK_CONNECTOR, defaultSinkConnectorProps(FOO_TOPIC, BAR_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, NUM_TASKS,
"Connector tasks did not start in time.");
connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC, BAR_TOPIC),
"Active topic set is not: " + Arrays.asList(FOO_TOPIC, BAR_TOPIC) + " for connector: " + SINK_CONNECTOR);
// deleting a connector resets its active topics
connect.deleteConnector(BAR_CONNECTOR);
connect.assertions().assertConnectorAndTasksAreStopped(BAR_CONNECTOR,
"Connector tasks did not stop in time.");
connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for deleted connector: " + BAR_CONNECTOR);
// Unfortunately there's currently no easy way to know when the consumer caught up with
// the last records that the producer of the stopped connector managed to produce.
// Repeated runs show that this amount of time is sufficient for the consumer to catch up.
Thread.sleep(5000);
// reset active topics for the sink connector after one of the topics has become idle
connect.resetConnectorTopics(SINK_CONNECTOR);
connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC),
"Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
}
@Test
public void testTopicTrackingResetIsDisabled() throws InterruptedException {
workerProps.put(TOPIC_TRACKING_ALLOW_RESET_CONFIG, "false");
connect = connectBuilder.build();
// start the clusters
connect.start();
// create test topic
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for connector: " + FOO_CONNECTOR);
// start a source connector
connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
"Connector tasks did not start in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.singletonList(FOO_TOPIC),
"Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + FOO_CONNECTOR);
// start a sink connector
connect.configureConnector(SINK_CONNECTOR, defaultSinkConnectorProps(FOO_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, NUM_TASKS,
"Connector tasks did not start in time.");
connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC),
"Active topic set is not: " + Arrays.asList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
// deleting a connector resets its active topics
connect.deleteConnector(FOO_CONNECTOR);
connect.assertions().assertConnectorAndTasksAreStopped(FOO_CONNECTOR,
"Connector tasks did not stop in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for deleted connector: " + FOO_CONNECTOR);
// Unfortunately there's currently no easy way to know when the consumer caught up with
// the last records that the producer of the stopped connector managed to produce.
// Repeated runs show that this amount of time is sufficient for the consumer to catch up.
Thread.sleep(5000);
// resetting active topics for the sink connector won't work when the config is disabled
Exception e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorTopics(SINK_CONNECTOR));
assertTrue(e.getMessage().contains("Topic tracking reset is disabled."));
connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC),
"Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
}
@Test
public void testTopicTrackingIsDisabled() throws InterruptedException {
workerProps.put(TOPIC_TRACKING_ENABLE_CONFIG, "false");
connect = connectBuilder.build();
// start the clusters
connect.start();
// create test topic
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
// start a source connector
connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
"Connector tasks did not start in time.");
// resetting active topics for the sink connector won't work when the config is disabled
Exception e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorTopics(SINK_CONNECTOR));
assertTrue(e.getMessage().contains("Topic tracking is disabled."));
e = assertThrows(ConnectRestException.class, () -> connect.connectorTopics(SINK_CONNECTOR));
assertTrue(e.getMessage().contains("Topic tracking is disabled."));
// Wait for tasks to produce a few records
Thread.sleep(5000);
assertNoTopicStatusInStatusTopic();
}
public void assertNoTopicStatusInStatusTopic() {
String statusTopic = workerProps.get(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
Consumer<byte[], byte[]> verifiableConsumer = connect.kafka().createConsumer(
Collections.singletonMap("group.id", "verifiable-consumer-group-0"));
List<TopicPartition> partitions =
Optional.ofNullable(verifiableConsumer.partitionsFor(statusTopic))
.orElseThrow(() -> new AssertionError("Unable to retrieve partitions info for status topic"))
.stream()
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());
verifiableConsumer.assign(partitions);
// Based on the implementation of {@link org.apache.kafka.connect.util.KafkaBasedLog#readToLogEnd}
Set<TopicPartition> assignment = verifiableConsumer.assignment();
verifiableConsumer.seekToBeginning(assignment);
Map<TopicPartition, Long> endOffsets = verifiableConsumer.endOffsets(assignment);
while (!endOffsets.isEmpty()) {
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
if (verifiableConsumer.position(entry.getKey()) >= entry.getValue())
it.remove();
else {
try {
StreamSupport.stream(verifiableConsumer.poll(Duration.ofMillis(Integer.MAX_VALUE)).spliterator(), false)
.map(ConsumerRecord::key)
.filter(Objects::nonNull)
.filter(key -> new String(key, StandardCharsets.UTF_8).startsWith(KafkaStatusBackingStore.TOPIC_STATUS_PREFIX))
.findFirst()
.ifPresent(key -> {
throw new AssertionError("Found unexpected key: " + new String(key, StandardCharsets.UTF_8) + " in status topic");
});
} catch (KafkaException e) {
throw new AssertionError("Error while reading to the end of status topic", e);
}
break;
}
}
}
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPIC_CONFIG, topic);
props.put("throughput", String.valueOf(10));
props.put("messages.per.poll", String.valueOf(10));
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return props;
}
private Map<String, String> defaultSinkConnectorProps(String... topics) {
// setup up props for the sink connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPICS_CONFIG, String.join(",", topics));
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return props;
}
}

View File

@ -77,6 +77,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
@Override
public void stop() {
log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName);
connectorHandle.recordConnectorStop();
}
@ -153,6 +154,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
@Override
public void stop() {
log.info("Stopped {} task {}", this.getClass().getSimpleName(), taskId);
taskHandle.recordTaskStop();
}
}

View File

@ -34,6 +34,7 @@ import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
@ -74,6 +75,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -118,21 +120,24 @@ public class DistributedHerderTest {
private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
private static final Integer MAX_TASKS = 3;
private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
private static final String FOO_TOPIC = "foo";
private static final String BAR_TOPIC = "bar";
private static final String BAZ_TOPIC = "baz";
static {
CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", FOO_TOPIC, BAR_TOPIC));
CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);
static {
CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", FOO_TOPIC, BAR_TOPIC, BAZ_TOPIC));
}
private static final Map<String, String> CONN2_CONFIG = new HashMap<>();
static {
CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", FOO_TOPIC, BAR_TOPIC));
CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map<String, String> TASK_CONFIG = new HashMap<>();
@ -361,7 +366,6 @@ public class DistributedHerderTest {
member.requestRejoin();
PowerMock.expectLastCall();
expectPostRebalanceCatchup(SNAPSHOT);
// In the second rebalance the new member gets its assignment and this member has no
// assignments or revocations
@ -372,6 +376,7 @@ public class DistributedHerderTest {
PowerMock.replayAll();
herder.configState = SNAPSHOT;
time.sleep(1000L);
assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
herder.tick();
@ -771,15 +776,37 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
// No immediate action besides this -- change will be picked up via the config log
// The change eventually is reflected to the config topic and the deleted connector and
// tasks are revoked
member.wakeup();
PowerMock.expectLastCall();
TopicStatus fooStatus = new TopicStatus(FOO_TOPIC, CONN1, 0, time.milliseconds());
TopicStatus barStatus = new TopicStatus(BAR_TOPIC, CONN1, 0, time.milliseconds());
EasyMock.expect(statusBackingStore.getAllTopics(EasyMock.eq(CONN1))).andReturn(new HashSet<>(Arrays.asList(fooStatus, barStatus))).times(2);
statusBackingStore.deleteTopic(EasyMock.eq(CONN1), EasyMock.eq(FOO_TOPIC));
PowerMock.expectLastCall().times(2);
statusBackingStore.deleteTopic(EasyMock.eq(CONN1), EasyMock.eq(BAR_TOPIC));
PowerMock.expectLastCall().times(2);
expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1),
ConnectProtocol.Assignment.NO_ERROR, 2,
Collections.emptyList(), Collections.emptyList(), 0);
expectPostRebalanceCatchup(ClusterConfigState.EMPTY);
member.requestRejoin();
PowerMock.expectLastCall();
PowerMock.replayAll();
herder.deleteConnectorConfig(CONN1, putConnectorCallback);
herder.tick();
time.sleep(1000L);
assertStatistics(3, 1, 100, 1000L);
assertStatistics("leaderUrl", false, 3, 1, 100, 1000L);
configUpdateListener.onConnectorConfigRemove(CONN1); // read updated config that removes the connector
herder.configState = ClusterConfigState.EMPTY;
herder.tick();
time.sleep(1000L);
assertStatistics("leaderUrl", true, 3, 1, 100, 2100L);
PowerMock.verifyAll();
}

View File

@ -31,11 +31,13 @@ import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
@ -54,6 +56,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;
@ -67,8 +70,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@RunWith(PowerMockRunner.class)
@PrepareForTest(RestClient.class)
@ -130,16 +137,26 @@ public class ConnectorsResourceTest {
TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
}
private static final Set<String> CONNECTOR_ACTIVE_TOPICS = new HashSet<>(
Arrays.asList("foo_topic", "bar_topic"));
private static final Set<String> CONNECTOR2_ACTIVE_TOPICS = new HashSet<>(
Arrays.asList("foo_topic", "baz_topic"));
@Mock
private Herder herder;
private ConnectorsResource connectorsResource;
private UriInfo forward;
@Mock
private WorkerConfig workerConfig;
@Before
public void setUp() throws NoSuchMethodException {
PowerMock.mockStatic(RestClient.class,
RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class));
connectorsResource = new ConnectorsResource(herder, null);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
PowerMock.replay(workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig);
forward = EasyMock.mock(UriInfo.class);
MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
queryParams.putSingle("forward", "true");
@ -434,8 +451,8 @@ public class ConnectorsResourceTest {
herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, null))
.andReturn(new RestClient.HttpResponse<>(204, new HashMap<String, String>(), null));
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, workerConfig))
.andReturn(new RestClient.HttpResponse<>(204, new HashMap<>(), null));
PowerMock.replayAll();
@ -803,6 +820,90 @@ public class ConnectorsResourceTest {
PowerMock.verifyAll();
}
@Test
public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
PowerMock.reset(workerConfig);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false);
PowerMock.replay(workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig);
PowerMock.replayAll();
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
assertEquals("Topic tracking is disabled.", e.getMessage());
PowerMock.verifyAll();
}
@Test
public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() {
PowerMock.reset(workerConfig);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
HttpHeaders headers = EasyMock.mock(HttpHeaders.class);
PowerMock.replay(workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig);
PowerMock.replayAll();
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
assertEquals("Topic tracking is disabled.", e.getMessage());
PowerMock.verifyAll();
}
@Test
public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() {
PowerMock.reset(workerConfig);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false);
HttpHeaders headers = EasyMock.mock(HttpHeaders.class);
PowerMock.replay(workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig);
PowerMock.replayAll();
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
assertEquals("Topic tracking reset is disabled.", e.getMessage());
PowerMock.verifyAll();
}
@Test
public void testConnectorActiveTopics() {
PowerMock.reset(workerConfig);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
EasyMock.expect(herder.connectorActiveTopics(CONNECTOR_NAME))
.andReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS));
PowerMock.replay(workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig);
PowerMock.replayAll();
Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
Map<String, Map<String, Object>> body = (Map<String, Map<String, Object>>) response.getEntity();
assertEquals(CONNECTOR_NAME, ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).connector());
assertEquals(new HashSet<>(CONNECTOR_ACTIVE_TOPICS),
((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).topics());
PowerMock.verifyAll();
}
@Test
public void testResetConnectorActiveTopics() {
PowerMock.reset(workerConfig);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
HttpHeaders headers = EasyMock.mock(HttpHeaders.class);
herder.resetConnectorActiveTopics(CONNECTOR_NAME);
EasyMock.expectLastCall();
PowerMock.replay(workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig);
PowerMock.replayAll();
Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
PowerMock.verifyAll();
}
private <T> byte[] serializeAsBytes(final T value) throws IOException {
return new ObjectMapper().writeValueAsBytes(value);
}

View File

@ -22,7 +22,9 @@ import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.io.IOException;
@ -35,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
public class FileOffsetBackingStoreTest {
FileOffsetBackingStore store;

View File

@ -17,46 +17,151 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.easymock.Capture;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.CONNECTOR_STATUS_PREFIX;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TASK_STATUS_PREFIX;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TOPIC_STATUS_PREFIX;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TOPIC_STATUS_SEPARATOR;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
public class KafkaStatusBackingStoreFormatTest extends EasyMockSupport {
private static final String STATUS_TOPIC = "status-topic";
private static final String FOO_TOPIC = "foo-topic";
private static final String FOO_CONNECTOR = "foo-source";
private static final String BAR_TOPIC = "bar-topic";
private JsonConverter converter;
private Time time;
private KafkaStatusBackingStore store;
private JsonConverter converter;
@Mock
private KafkaBasedLog<String, byte[]> kafkaBasedLog;
@Before
public void setup() {
time = new MockTime();
converter = new JsonConverter();
converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, null);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
}
@Test
public void readInvalidStatus() {
String key = "status-unknown";
byte[] value = new byte[0];
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.connectors().isEmpty());
assertTrue(store.tasks.isEmpty());
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.connectors().isEmpty());
assertTrue(store.tasks.isEmpty());
assertTrue(store.topics.isEmpty());
key = CONNECTOR_STATUS_PREFIX;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.connectors().isEmpty());
store.read(statusRecord);
assertTrue(store.connectors().isEmpty());
key = TASK_STATUS_PREFIX;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.tasks.isEmpty());
store.read(statusRecord);
assertTrue(store.tasks.isEmpty());
key = TASK_STATUS_PREFIX + FOO_CONNECTOR + "-#";
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.tasks.isEmpty());
store.read(statusRecord);
assertTrue(store.tasks.isEmpty());
key = TOPIC_STATUS_PREFIX;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
key = TOPIC_STATUS_PREFIX + TOPIC_STATUS_SEPARATOR;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
key = TOPIC_STATUS_PREFIX + FOO_TOPIC + ":";
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
}
@Test
public void readInvalidStatusValue() {
String key = CONNECTOR_STATUS_PREFIX + FOO_CONNECTOR;
byte[] value = "invalid".getBytes();
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.connectors().isEmpty());
store.read(statusRecord);
assertTrue(store.connectors().isEmpty());
key = TASK_STATUS_PREFIX + FOO_CONNECTOR + "-0";
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.tasks.isEmpty());
store.read(statusRecord);
assertTrue(store.tasks.isEmpty());
key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
}
@Test
public void readTopicStatus() {
TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds());
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), Time.SYSTEM.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
byte[] value = store.serializeTopicStatus(topicStatus);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", value);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
store.read(statusRecord);
assertTrue(store.topics.containsKey("bar"));
assertTrue(store.topics.get("bar").containsKey("foo"));
assertEquals(topicStatus, store.topics.get("bar").get("foo"));
assertTrue(store.topics.containsKey(FOO_CONNECTOR));
assertTrue(store.topics.get(FOO_CONNECTOR).containsKey(FOO_TOPIC));
assertEquals(topicStatus, store.topics.get(FOO_CONNECTOR).get(FOO_TOPIC));
}
@Test
@ -66,10 +171,132 @@ public class KafkaStatusBackingStoreFormatTest extends EasyMockSupport {
assertTrue(store.topics.containsKey("bar"));
assertTrue(store.topics.get("bar").containsKey("foo"));
assertEquals(topicStatus, store.topics.get("bar").get("foo"));
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", null);
// should return null
byte[] value = store.serializeTopicStatus(null);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", value);
store.read(statusRecord);
assertTrue(store.topics.containsKey("bar"));
assertFalse(store.topics.get("bar").containsKey("foo"));
assertEquals(Collections.emptyMap(), store.topics.get("bar"));
}
@Test
public void putTopicState() {
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
Capture<byte[]> valueCapture = newCapture();
Capture<Callback> callbackCapture = newCapture();
kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
expectLastCall()
.andAnswer(() -> {
callbackCapture.getValue().onCompletion(null, null);
return null;
});
replayAll();
store.put(topicStatus);
// check capture state
assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
// state is not visible until read back from the log
assertEquals(null, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, valueCapture.getValue());
store.read(statusRecord);
assertEquals(topicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
assertEquals(new HashSet<>(Collections.singletonList(topicStatus)), new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
verifyAll();
}
@Test
public void putTopicStateRetriableFailure() {
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
Capture<byte[]> valueCapture = newCapture();
Capture<Callback> callbackCapture = newCapture();
kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
expectLastCall()
.andAnswer(() -> {
callbackCapture.getValue().onCompletion(null, new TimeoutException());
return null;
})
.andAnswer(() -> {
callbackCapture.getValue().onCompletion(null, null);
return null;
});
replayAll();
store.put(topicStatus);
// check capture state
assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
// state is not visible until read back from the log
assertEquals(null, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
verifyAll();
}
@Test
public void putTopicStateNonRetriableFailure() {
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
Capture<byte[]> valueCapture = newCapture();
Capture<Callback> callbackCapture = newCapture();
kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
expectLastCall()
.andAnswer(() -> {
callbackCapture.getValue().onCompletion(null, new UnknownServerException());
return null;
});
replayAll();
// the error is logged and ignored
store.put(topicStatus);
// check capture state
assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
// state is not visible until read back from the log
assertEquals(null, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
verifyAll();
}
@Test
public void putTopicStateShouldOverridePreviousState() {
TopicStatus firstTopicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR,
0), time.milliseconds());
time.sleep(1000);
TopicStatus secondTopicStatus = new TopicStatus(BAR_TOPIC, new ConnectorTaskId(FOO_CONNECTOR,
0), time.milliseconds());
String firstKey = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
String secondKey = TOPIC_STATUS_PREFIX + BAR_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
Capture<byte[]> valueCapture = newCapture();
Capture<Callback> callbackCapture = newCapture();
kafkaBasedLog.send(eq(secondKey), capture(valueCapture), capture(callbackCapture));
expectLastCall()
.andAnswer(() -> {
callbackCapture.getValue().onCompletion(null, null);
// The second status record is read soon after it's persisted in the status topic
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, secondKey, valueCapture.getValue());
store.read(statusRecord);
return null;
});
replayAll();
byte[] value = store.serializeTopicStatus(firstTopicStatus);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, firstKey, value);
store.read(statusRecord);
store.put(secondTopicStatus);
// check capture state
assertEquals(secondTopicStatus, store.parseTopicStatus(valueCapture.getValue()));
assertEquals(firstTopicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
assertEquals(secondTopicStatus, store.getTopic(FOO_CONNECTOR, BAR_TOPIC));
assertEquals(new HashSet<>(Arrays.asList(firstTopicStatus, secondTopicStatus)),
new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
verifyAll();
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.record.TimestampType;
@ -27,15 +28,23 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static org.easymock.EasyMock.anyObject;
@ -45,8 +54,11 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("unchecked")
@RunWith(PowerMockRunner.class)
public class KafkaStatusBackingStoreTest extends EasyMockSupport {
private static final String STATUS_TOPIC = "status-topic";
@ -54,12 +66,34 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
private static final String CONNECTOR = "conn";
private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);
private KafkaStatusBackingStore store;
@Mock
Converter converter;
@Mock
private KafkaBasedLog<String, byte[]> kafkaBasedLog;
@Mock
WorkerConfig workerConfig;
@Before
public void setup() {
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
}
@Test
public void misconfigurationOfStatusBackingStore() {
expect(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).andReturn(null);
expect(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).andReturn(" ");
replayAll();
Exception e = assertThrows(ConfigException.class, () -> store.configure(workerConfig));
assertEquals("Must specify topic for connector status.", e.getMessage());
e = assertThrows(ConfigException.class, () -> store.configure(workerConfig));
assertEquals("Must specify topic for connector status.", e.getMessage());
verifyAll();
}
@Test
public void putConnectorState() {
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@ -87,10 +121,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putConnectorStateRetriableFailure() {
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@ -125,10 +155,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putConnectorStateNonRetriableFailure() {
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@ -160,10 +186,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
byte[] value = new byte[0];
String otherWorkerId = "anotherhost:8083";
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
// the persisted came from a different host and has a newer generation
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", otherWorkerId);
@ -188,10 +210,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putSafeWithNoPreviousValueIsPropagated() {
final Converter converter = mock(Converter.class);
final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
final byte[] value = new byte[0];
final Capture<Struct> statusValueStruct = newCapture();
@ -217,10 +235,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
public void putSafeOverridesValueSetBySameWorker() {
final byte[] value = new byte[0];
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
// the persisted came from the same host, but has a newer generation
Map<String, Object> firstStatusRead = new HashMap<>();
firstStatusRead.put("worker_id", WORKER_ID);
@ -267,10 +281,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
final byte[] value = new byte[0];
String otherWorkerId = "anotherhost:8083";
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
// the persisted came from a different host and has a newer generation
Map<String, Object> firstStatusRead = new HashMap<>();
firstStatusRead.put("worker_id", otherWorkerId);
@ -315,10 +325,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
public void readConnectorState() {
byte[] value = new byte[0];
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
@ -339,10 +345,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
@Test
public void putTaskState() {
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
byte[] value = new byte[0];
expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
.andStubReturn(value);
@ -372,10 +374,6 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
public void readTaskState() {
byte[] value = new byte[0];
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
@ -394,6 +392,69 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
verifyAll();
}
@Test
public void deleteConnectorState() {
final byte[] value = new byte[0];
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class));
EasyMock.expectLastCall().andReturn(value);
kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), anyObject(Callback.class));
expectLastCall();
converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class));
EasyMock.expectLastCall().andReturn(value);
kafkaBasedLog.send(eq("status-task-conn-0"), eq(value), anyObject(Callback.class));
expectLastCall();
expect(converter.toConnectData(STATUS_TOPIC, value)).andReturn(new SchemaAndValue(null, statusMap));
replayAll();
ConnectorStatus connectorStatus = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
store.put(connectorStatus);
TaskStatus taskStatus = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
store.put(taskStatus);
store.read(consumerRecord(0, "status-task-conn-0", value));
assertEquals(new HashSet<>(Collections.singletonList(CONNECTOR)), store.connectors());
assertEquals(new HashSet<>(Collections.singletonList(taskStatus)), new HashSet<>(store.getAll(CONNECTOR)));
store.read(consumerRecord(0, "status-connector-conn", null));
assertTrue(store.connectors().isEmpty());
assertTrue(store.getAll(CONNECTOR).isEmpty());
verifyAll();
}
@Test
public void deleteTaskState() {
final byte[] value = new byte[0];
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class));
EasyMock.expectLastCall().andReturn(value);
kafkaBasedLog.send(eq("status-task-conn-0"), eq(value), anyObject(Callback.class));
expectLastCall();
expect(converter.toConnectData(STATUS_TOPIC, value)).andReturn(new SchemaAndValue(null, statusMap));
replayAll();
TaskStatus taskStatus = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
store.put(taskStatus);
store.read(consumerRecord(0, "status-task-conn-0", value));
assertEquals(new HashSet<>(Collections.singletonList(taskStatus)), new HashSet<>(store.getAll(CONNECTOR)));
store.read(consumerRecord(0, "status-task-conn-0", null));
assertTrue(store.getAll(CONNECTOR).isEmpty());
verifyAll();
}
private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
TimestampType.CREATE_TIME, 0L, 0, 0, key, value);

View File

@ -16,9 +16,11 @@
*/
package org.apache.kafka.connect.util.clusters;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@ -36,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@ -355,6 +358,52 @@ public class EmbeddedConnectCluster {
"Could not read connector state. Error response: " + responseToString(response));
}
/**
* Get the active topics of a connector running in this cluster.
*
* @param connectorName name of the connector
* @return an instance of {@link ConnectorStateInfo} populated with state information of the connector and its tasks.
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code.
* @throws ConnectException for any other error.
*/
public ActiveTopicsInfo connectorTopics(String connectorName) {
ObjectMapper mapper = new ObjectMapper();
String url = endpointForResource(String.format("connectors/%s/topics", connectorName));
Response response = requestGet(url);
try {
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
Map<String, Map<String, List<String>>> activeTopics = mapper
.readerFor(new TypeReference<Map<String, Map<String, List<String>>>>() { })
.readValue(responseToString(response));
return new ActiveTopicsInfo(connectorName,
activeTopics.get(connectorName).getOrDefault("topics", Collections.emptyList()));
}
} catch (IOException e) {
log.error("Could not read connector state from response: {}",
responseToString(response), e);
throw new ConnectException("Could not not parse connector state", e);
}
throw new ConnectRestException(response.getStatus(),
"Could not read connector state. Error response: " + responseToString(response));
}
/**
* Reset the set of active topics of a connector running in this cluster.
*
* @param connectorName name of the connector
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code.
* @throws ConnectException for any other error.
*/
public void resetConnectorTopics(String connectorName) {
String url = endpointForResource(String.format("connectors/%s/topics/reset", connectorName));
Response response = requestPut(url, null);
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
throw new ConnectRestException(response.getStatus(),
"Resetting active topics for connector " + connectorName + " failed. "
+ "Error response: " + responseToString(response));
}
}
/**
* Get the full URL of the admin endpoint that corresponds to the given REST resource
*

View File

@ -17,12 +17,14 @@
package org.apache.kafka.connect.util.clusters;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
@ -37,6 +39,7 @@ public class EmbeddedConnectClusterAssertions {
private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
public static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
private final EmbeddedConnectCluster connect;
@ -243,4 +246,43 @@ public class EmbeddedConnectClusterAssertions {
}
}
/**
* Assert that a connector's set of active topics matches the given collection of topic names.
*
* @param connectorName the connector name
* @param topics a collection of topics to compare against
* @param detailMessage the assertion message
* @throws InterruptedException
*/
public void assertConnectorActiveTopics(String connectorName, Collection<String> topics, String detailMessage) throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorActiveTopics(connectorName, topics).orElse(false),
CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS,
"Connector active topics don't match the expected collection");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
}
/**
* Check whether a connector's set of active topics matches the given collection of topic names.
*
* @param connectorName the connector name
* @param topics a collection of topics to compare against
* @return true if the connector's active topics matches the given collection; false otherwise
*/
protected Optional<Boolean> checkConnectorActiveTopics(String connectorName, Collection<String> topics) {
try {
ActiveTopicsInfo info = connect.connectorTopics(connectorName);
boolean result = info != null
&& topics.size() == info.topics().size()
&& topics.containsAll(info.topics());
log.debug("Found connector {} using topics: {}", connectorName, info.topics());
return Optional.of(result);
} catch (Exception e) {
log.error("Could not check connector {} state info.", connectorName, e);
return Optional.empty();
}
}
}