KAFKA-3863: System tests covering connector/task failure and restart

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1519 from hachikuji/KAFKA-3863
This commit is contained in:
Jason Gustafson 2016-06-22 17:06:49 -07:00 committed by Ewen Cheslack-Postava
parent 8bf18df1b6
commit 36cab7dbdf
11 changed files with 554 additions and 7 deletions

View File

@ -32,6 +32,9 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.tools.MockConnector;
import org.apache.kafka.connect.tools.MockSinkConnector;
import org.apache.kafka.connect.tools.MockSourceConnector;
import org.apache.kafka.connect.tools.VerifiableSinkConnector;
import org.apache.kafka.connect.tools.VerifiableSourceConnector;
import org.apache.kafka.connect.util.ConnectorTaskId;
@ -87,7 +90,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private static List<ConnectorPluginInfo> validConnectorPlugins;
private static final Object LOCK = new Object();
private Thread classPathTraverser;
private static final List<Class<? extends Connector>> EXCLUDES = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
private static final List<Class<? extends Connector>> EXCLUDES = Arrays.asList(
VerifiableSourceConnector.class, VerifiableSinkConnector.class,
MockConnector.class, MockSourceConnector.class, MockSinkConnector.class);
public AbstractHerder(Worker worker,
String workerId,

View File

@ -401,6 +401,7 @@ public class Worker {
WorkerTask task = getTask(id);
stopTask(task);
awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
log.info("Task {} completed shutdown.", task.id());
}
/**

View File

@ -66,10 +66,9 @@ public class WorkerConnector {
}
public void initialize(ConnectorConfig connectorConfig) {
log.debug("Initializing connector {} with config {}", connName, config);
try {
this.config = connectorConfig.originalsStrings();
log.debug("Initializing connector {} with config {}", connName, config);
connector.initialize(new ConnectorContext() {
@Override

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* This connector provides support for mocking certain connector behaviors. For example,
* this can be used to simulate connector or task failures. It works by passing a "mock mode"
* through configuration from the system test. New mock behavior can be implemented either
* in the connector or in the task by providing a new mode implementation.
*
* At the moment, this connector only supports a single task and shares configuration between
* the connector and its tasks.
*
* @see MockSinkConnector
* @see MockSourceConnector
*/
public class MockConnector extends Connector {
public static final String MOCK_MODE_KEY = "mock_mode";
public static final String DELAY_MS_KEY = "delay_ms";
public static final String CONNECTOR_FAILURE = "connector-failure";
public static final String TASK_FAILURE = "task-failure";
public static final long DEFAULT_FAILURE_DELAY_MS = 15000;
private Map<String, String> config;
private ScheduledExecutorService executor;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> config) {
this.config = config;
if (CONNECTOR_FAILURE.equals(config.get(MOCK_MODE_KEY))) {
// Schedule this connector to raise an exception after some delay
String delayMsString = config.get(DELAY_MS_KEY);
long delayMs = DEFAULT_FAILURE_DELAY_MS;
if (delayMsString != null)
delayMs = Long.parseLong(delayMsString);
executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(new Runnable() {
@Override
public void run() {
context.raiseError(new RuntimeException());
}
}, delayMs, TimeUnit.MILLISECONDS);
}
}
@Override
public Class<? extends Task> taskClass() {
throw new UnsupportedOperationException();
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.singletonList(config);
}
@Override
public void stop() {
if (executor != null) {
executor.shutdownNow();
try {
if (!executor.awaitTermination(20, TimeUnit.SECONDS))
throw new RuntimeException("Failed timely termination of scheduler");
} catch (InterruptedException e) {
throw new RuntimeException("Task was interrupted during shutdown");
}
}
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.List;
import java.util.Map;
/**
* Mock sink implementation which delegates to {@link MockConnector}.
*/
public class MockSinkConnector extends SinkConnector {
private MockConnector delegate = new MockConnector();
@Override
public void initialize(ConnectorContext ctx) {
delegate.initialize(ctx);
}
@Override
public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
delegate.initialize(ctx, taskConfigs);
}
@Override
public void reconfigure(Map<String, String> props) {
delegate.reconfigure(props);
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
return delegate.validate(connectorConfigs);
}
@Override
public String version() {
return delegate.version();
}
@Override
public void start(Map<String, String> props) {
delegate.start(props);
}
@Override
public Class<? extends Task> taskClass() {
return MockSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return delegate.taskConfigs(maxTasks);
}
@Override
public void stop() {
delegate.stop();
}
@Override
public ConfigDef config() {
return delegate.config();
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import java.util.Collection;
import java.util.Map;
public class MockSinkTask extends SinkTask {
private String mockMode;
private long startTimeMs;
private long failureDelayMs;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> config) {
this.mockMode = config.get(MockConnector.MOCK_MODE_KEY);
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
this.startTimeMs = System.currentTimeMillis();
String delayMsString = config.get(MockConnector.DELAY_MS_KEY);
this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
if (delayMsString != null)
failureDelayMs = Long.parseLong(delayMsString);
}
}
@Override
public void put(Collection<SinkRecord> records) {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
if (now > startTimeMs + failureDelayMs)
throw new RuntimeException();
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.List;
import java.util.Map;
/**
* Mock source implementation which delegates to {@link MockConnector}.
*/
public class MockSourceConnector extends SourceConnector {
private MockConnector delegate = new MockConnector();
@Override
public void initialize(ConnectorContext ctx) {
delegate.initialize(ctx);
}
@Override
public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
delegate.initialize(ctx, taskConfigs);
}
@Override
public void reconfigure(Map<String, String> props) {
delegate.reconfigure(props);
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
return delegate.validate(connectorConfigs);
}
@Override
public String version() {
return delegate.version();
}
@Override
public void start(Map<String, String> props) {
delegate.start(props);
}
@Override
public Class<? extends Task> taskClass() {
return MockSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return delegate.taskConfigs(maxTasks);
}
@Override
public void stop() {
delegate.stop();
}
@Override
public ConfigDef config() {
return delegate.config();
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MockSourceTask extends SourceTask {
private String mockMode;
private long startTimeMs;
private long failureDelayMs;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> config) {
this.mockMode = config.get(MockConnector.MOCK_MODE_KEY);
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
this.startTimeMs = System.currentTimeMillis();
String delayMsString = config.get(MockConnector.DELAY_MS_KEY);
this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
if (delayMsString != null)
failureDelayMs = Long.parseLong(delayMsString);
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
if (now > startTimeMs + failureDelayMs)
throw new RuntimeException();
}
return Collections.emptyList();
}
@Override
public void stop() {
}
}

View File

@ -39,6 +39,9 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.tools.MockConnector;
import org.apache.kafka.connect.tools.MockSinkConnector;
import org.apache.kafka.connect.tools.MockSourceConnector;
import org.apache.kafka.connect.tools.VerifiableSinkConnector;
import org.apache.kafka.connect.tools.VerifiableSourceConnector;
import org.easymock.EasyMock;
@ -165,6 +168,9 @@ public class ConnectorPluginsResourceTest {
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName())));
assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
}

View File

@ -135,6 +135,12 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
def get_connector_status(self, name, node=None):
return self._rest('/connectors/' + name + '/status', node=node)
def restart_connector(self, name, node=None):
return self._rest('/connectors/' + name + '/restart', method="POST")
def restart_task(self, connector_name, task_id, node=None):
return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart', method="POST")
def pause_connector(self, name, node=None):
return self._rest('/connectors/' + name + '/pause', method="PUT")
@ -331,3 +337,46 @@ class VerifiableSink(VerifiableConnector):
'tasks.max': self.tasks,
'topics': ",".join(self.topics)
})
class MockSink(object):
def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
self.cc = cc
self.logger = self.cc.logger
self.name = name
self.mode = mode
self.delay_sec = delay_sec
self.topics = topics
def start(self):
self.logger.info("Creating connector MockSinkConnector %s", self.name)
self.cc.create_connector({
'name': self.name,
'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector',
'tasks.max': 1,
'topics': ",".join(self.topics),
'mock_mode': self.mode,
'delay_ms': self.delay_sec * 1000
})
class MockSource(object):
def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-source"):
self.cc = cc
self.logger = self.cc.logger
self.name = name
self.mode = mode
self.delay_sec = delay_sec
self.topics = topics
def start(self):
self.logger.info("Creating connector MockSourceConnector %s", self.name)
self.cc.create_connector({
'name': self.name,
'connector.class': 'org.apache.kafka.connect.tools.MockSourceConnector',
'tasks.max': 1,
'topics': ",".join(self.topics),
'mock_mode': self.mode,
'delay_ms': self.delay_sec * 1000
})

View File

@ -17,7 +17,7 @@ from ducktape.tests.test import Test
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
@ -88,9 +88,23 @@ class ConnectDistributedTest(Test):
except ConnectRestError:
return None
def _has_state(self, status, state):
def _connector_has_state(self, status, state):
return status is not None and status['connector']['state'] == state
def _task_has_state(self, task_id, status, state):
if not status:
return False
tasks = status['tasks']
if not tasks:
return False
for task in tasks:
if task['id'] == task_id:
return task['state'] == state
return False
def _all_tasks_have_state(self, status, task_count, state):
if status is None:
return False
@ -103,11 +117,68 @@ class ConnectDistributedTest(Test):
def is_running(self, connector, node=None):
status = self._connector_status(connector.name, node)
return self._has_state(status, 'RUNNING') and self._all_tasks_have_state(status, connector.tasks, 'RUNNING')
return self._connector_has_state(status, 'RUNNING') and self._all_tasks_have_state(status, connector.tasks, 'RUNNING')
def is_paused(self, connector, node=None):
status = self._connector_status(connector.name, node)
return self._has_state(status, 'PAUSED') and self._all_tasks_have_state(status, connector.tasks, 'PAUSED')
return self._connector_has_state(status, 'PAUSED') and self._all_tasks_have_state(status, connector.tasks, 'PAUSED')
def connector_is_running(self, connector, node=None):
status = self._connector_status(connector.name, node)
return self._connector_has_state(status, 'RUNNING')
def connector_is_failed(self, connector, node=None):
status = self._connector_status(connector.name, node)
return self._connector_has_state(status, 'FAILED')
def task_is_failed(self, connector, task_id, node=None):
status = self._connector_status(connector.name, node)
return self._task_has_state(task_id, status, 'FAILED')
def task_is_running(self, connector, task_id, node=None):
status = self._connector_status(connector.name, node)
return self._task_has_state(task_id, status, 'RUNNING')
def test_restart_failed_connector(self):
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()
self.sink = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5)
self.sink.start()
wait_until(lambda: self.connector_is_failed(self.sink), timeout_sec=15,
err_msg="Failed to see connector transition to the FAILED state")
self.cc.restart_connector(self.sink.name)
wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10,
err_msg="Failed to see connector transition to the RUNNING state")
@matrix(connector_type=["source", "sink"])
def test_restart_failed_task(self, connector_type):
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()
connector = None
if connector_type == "sink":
connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
else:
connector = MockSource(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
connector.start()
task_id = 0
wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=15,
err_msg="Failed to see task transition to the FAILED state")
self.cc.restart_task(connector.name, task_id)
wait_until(lambda: self.task_is_running(connector, task_id), timeout_sec=10,
err_msg="Failed to see task transition to the RUNNING state")
def test_pause_and_resume_source(self):
"""