KAFKA-2752: Add VerifiableSource/Sink connectors and rolling bounce Copycat system tests.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ben Stopford, Geoff Anderson, Guozhang Wang

Closes #432 from ewencp/kafka-2752-copycat-clean-bounce-test
This commit is contained in:
Ewen Cheslack-Postava 2015-11-10 14:54:15 -08:00 committed by Guozhang Wang
parent 64a1bfeb9b
commit 8db55618d5
24 changed files with 873 additions and 64 deletions

View File

@ -72,7 +72,7 @@ do
CLASSPATH=$CLASSPATH:$dir/*
done
for cc_pkg in "api" "runtime" "file" "json"
for cc_pkg in "api" "runtime" "file" "json" "tools"
do
for file in $base_dir/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do

View File

@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
}
def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools']
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
@ -350,6 +350,8 @@ project(':core') {
from(project(':connect:json').configurations.runtime) { into("libs/") }
from(project(':connect:file').jar) { into("libs/") }
from(project(':connect:file').configurations.runtime) { into("libs/") }
from(project(':connect:tools').jar) { into("libs/") }
from(project(':connect:tools').configurations.runtime) { into("libs/") }
}
jar {
@ -887,3 +889,64 @@ project(':connect:file') {
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
project(':connect:tools') {
apply plugin: 'checkstyle'
archivesBaseName = "connect-tools"
dependencies {
compile project(':connect:api')
compile "$slf4japi"
compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
testCompile "$junit"
testCompile "$easymock"
testCompile "$powermock"
testCompile "$powermock_easymock"
testRuntime "$slf4jlog4j"
}
task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
}
test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}
javadoc {
include "**/org/apache/kafka/connect/*"
}
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
exclude('connect-*')
}
into "$buildDir/dependant-libs"
}
jar {
dependsOn copyDependantLibs
}
artifacts {
archives testJar
}
configurations {
archives.extendsFrom(testCompile)
}
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

View File

@ -205,6 +205,10 @@
<allow pkg="org.powermock" />
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="com.fasterxml.jackson" />
</subpackage>
</subpackage>
</import-control>

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.tools;
package org.apache.kafka.common.utils;
/**
@ -46,6 +46,7 @@ public class ThroughputThrottler {
long sleepDeficitNs = 0;
long targetThroughput = -1;
long startMs;
private boolean wakeup = false;
/**
* @param targetThroughput Can be messages/sec or bytes/sec
@ -83,7 +84,11 @@ public class ThroughputThrottler {
public void throttle() {
if (targetThroughput == 0) {
try {
Thread.sleep(Long.MAX_VALUE);
synchronized (this) {
while (!wakeup) {
this.wait();
}
}
} catch (InterruptedException e) {
// do nothing
}
@ -96,12 +101,21 @@ public class ThroughputThrottler {
// If enough sleep deficit has accumulated, sleep a little
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
long sleepStartNs = System.nanoTime();
long currentTimeNs = sleepStartNs;
try {
Thread.sleep(sleepMs, (int) sleepNs);
synchronized (this) {
long elapsed = currentTimeNs - sleepStartNs;
long remaining = sleepDeficitNs - elapsed;
while (!wakeup && remaining > 0) {
long sleepMs = remaining / 1000000;
long sleepNs = remaining - sleepMs * 1000000;
this.wait(sleepMs, (int) sleepNs);
elapsed = System.nanoTime() - sleepStartNs;
remaining = sleepDeficitNs - elapsed;
}
wakeup = false;
}
sleepDeficitNs = 0;
} catch (InterruptedException e) {
// If sleep is cut short, reduce deficit by the amount of
@ -113,5 +127,15 @@ public class ThroughputThrottler {
}
}
}
/**
* Wakeup the throttler if its sleeping.
*/
public void wakeup() {
synchronized (this) {
wakeup = true;
this.notifyAll();
}
}
}

View File

@ -56,6 +56,11 @@ public class SourceRecord extends ConnectRecord {
this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value);
}
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Schema keySchema, Object key, Schema valueSchema, Object value) {
this(sourcePartition, sourceOffset, topic, null, keySchema, key, valueSchema, value);
}
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key, Schema valueSchema, Object value) {

View File

@ -230,6 +230,13 @@ class WorkerSinkTask implements WorkerTask {
return workerConfig;
}
@Override
public String toString() {
return "WorkerSinkTask{" +
"id=" + id +
'}';
}
private KafkaConsumer<byte[], byte[]> createConsumer() {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task

View File

@ -80,7 +80,7 @@ class WorkerSinkTaskThread extends ShutdownableThread {
long commitTimeout = commitStarted + task.workerConfig().getLong(
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
if (committing && now >= commitTimeout) {
log.warn("Commit of {} offsets timed out", this);
log.warn("Commit of {} offsets timed out", task);
commitFailures++;
committing = false;
}
@ -98,11 +98,11 @@ class WorkerSinkTaskThread extends ShutdownableThread {
seqno, commitSeqno);
} else {
if (error != null) {
log.error("Commit of {} offsets threw an unexpected exception: ", this, error);
log.error("Commit of {} offsets threw an unexpected exception: ", task, error);
commitFailures++;
} else {
log.debug("Finished {} offset commit successfully in {} ms",
this, task.time().milliseconds() - commitStarted);
task, task.time().milliseconds() - commitStarted);
commitFailures = 0;
}
committing = false;

View File

@ -178,6 +178,8 @@ class WorkerSourceTask implements WorkerTask {
public boolean commitOffsets() {
long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
log.debug("{} Committing offsets", this);
long started = time.milliseconds();
long timeout = started + commitTimeoutMs;
@ -259,7 +261,7 @@ class WorkerSourceTask implements WorkerTask {
}
finishSuccessfulFlush();
log.debug("Finished {} commitOffsets successfully in {} ms",
log.info("Finished {} commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
return true;
}

View File

@ -133,6 +133,8 @@ public class RestServer {
}
public void stop() {
log.info("Stopping REST server");
try {
jettyServer.stop();
jettyServer.join();
@ -141,6 +143,8 @@ public class RestServer {
} finally {
jettyServer.destroy();
}
log.info("REST server stopped");
}
/**

View File

@ -27,6 +27,8 @@ import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
@ -51,6 +53,8 @@ import java.util.concurrent.TimeoutException;
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class ConnectorsResource {
private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
@ -159,7 +163,9 @@ public class ConnectorsResource {
} catch (ExecutionException e) {
if (e.getCause() instanceof NotLeaderException) {
NotLeaderException notLeaderError = (NotLeaderException) e.getCause();
return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType));
String forwardUrl = RestServer.urlJoin(notLeaderError.leaderUrl(), path);
log.debug("Forwarding request to leader: {} {} {}", forwardUrl, method, body);
return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType));
}
throw e.getCause();

View File

@ -189,6 +189,7 @@ public class KafkaBasedLog<K, V> {
* @param callback the callback to invoke once the end of the log has been reached.
*/
public void readToEnd(Callback<Void> callback) {
log.trace("Starting read to end log for topic {}", topic);
producer.flush();
synchronized (this) {
readLogEndOffsetCallbacks.add(callback);
@ -286,6 +287,10 @@ public class KafkaBasedLog<K, V> {
private class WorkThread extends Thread {
public WorkThread() {
super("KafkaBasedLog Work Thread - " + topic);
}
@Override
public void run() {
try {
@ -300,6 +305,7 @@ public class KafkaBasedLog<K, V> {
if (numCallbacks > 0) {
try {
readToLogEnd();
log.trace("Finished read to end log for topic {}", topic);
} catch (WakeupException e) {
// Either received another get() call and need to retry reading to end of log or stop() was
// called. Both are handled by restarting this loop.

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @see VerifiableSinkTask
*/
public class VerifiableSinkConnector extends SourceConnector {
private Map<String, String> config;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> props) {
this.config = props;
}
@Override
public Class<? extends Task> taskClass() {
return VerifiableSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (Integer i = 0; i < maxTasks; i++) {
Map<String, String> props = new HashMap<>(config);
props.put(VerifiableSinkTask.ID_CONFIG, i.toString());
configs.add(props);
}
return configs;
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Counterpart to {@link VerifiableSourceTask} that consumes records and logs information about each to stdout. This
* allows validation of processing of messages by sink tasks on distributed workers even in the face of worker restarts
* and failures. This task relies on the offset management provided by the Kafka Connect framework and therefore can detect
* bugs in its implementation.
*/
public class VerifiableSinkTask extends SinkTask {
public static final String NAME_CONFIG = "name";
public static final String ID_CONFIG = "id";
private static final ObjectMapper JSON_SERDE = new ObjectMapper();
private String name; // Connector name
private int id; // Task ID
private ArrayList<Map<String, Object>> unflushed = new ArrayList<>();
@Override
public String version() {
return new VerifiableSinkConnector().version();
}
@Override
public void start(Map<String, String> props) {
try {
name = props.get(NAME_CONFIG);
id = Integer.parseInt(props.get(ID_CONFIG));
} catch (NumberFormatException e) {
throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
}
}
@Override
public void put(Collection<SinkRecord> records) {
long nowMs = System.currentTimeMillis();
for (SinkRecord record : records) {
Map<String, Object> data = new HashMap<>();
data.put("name", name);
data.put("task", record.key()); // VerifiableSourceTask's input task (source partition)
data.put("sinkTask", id);
data.put("topic", record.topic());
data.put("time_ms", nowMs);
data.put("seqno", record.value());
data.put("offset", record.kafkaOffset());
String dataJson;
try {
dataJson = JSON_SERDE.writeValueAsString(data);
} catch (JsonProcessingException e) {
dataJson = "Bad data can't be written as json: " + e.getMessage();
}
System.out.println(dataJson);
unflushed.add(data);
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
long nowMs = System.currentTimeMillis();
for (Map<String, Object> data : unflushed) {
data.put("time_ms", nowMs);
data.put("flushed", true);
String dataJson;
try {
dataJson = JSON_SERDE.writeValueAsString(data);
} catch (JsonProcessingException e) {
dataJson = "Bad data can't be written as json: " + e.getMessage();
}
System.out.println(dataJson);
}
unflushed.clear();
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @see VerifiableSourceTask
*/
public class VerifiableSourceConnector extends SourceConnector {
private Map<String, String> config;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> props) {
this.config = props;
}
@Override
public Class<? extends Task> taskClass() {
return VerifiableSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (Integer i = 0; i < maxTasks; i++) {
Map<String, String> props = new HashMap<>(config);
props.put(VerifiableSourceTask.ID_CONFIG, i.toString());
configs.add(props);
}
return configs;
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.ThroughputThrottler;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A connector primarily intended for system tests. The connector simply generates as many tasks as requested. The
* tasks print metadata in the form of JSON to stdout for each message generated, making externally visible which
* messages have been sent. Each message is also assigned a unique, increasing seqno that is passed to Kafka Connect; when
* tasks are started on new nodes, this seqno is used to resume where the task previously left off, allowing for
* testing of distributed Kafka Connect.
*
* If logging is left enabled, log output on stdout can be easily ignored by checking whether a given line is valid JSON.
*/
public class VerifiableSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(VerifiableSourceTask.class);
public static final String NAME_CONFIG = "name";
public static final String ID_CONFIG = "id";
public static final String TOPIC_CONFIG = "topic";
public static final String THROUGHPUT_CONFIG = "throughput";
private static final String ID_FIELD = "id";
private static final String SEQNO_FIELD = "seqno";
private static final ObjectMapper JSON_SERDE = new ObjectMapper();
private String name; // Connector name
private int id; // Task ID
private String topic;
private Map<String, Integer> partition;
private long startingSeqno;
private long seqno;
private ThroughputThrottler throttler;
@Override
public String version() {
return new VerifiableSourceConnector().version();
}
@Override
public void start(Map<String, String> props) {
final long throughput;
try {
name = props.get(NAME_CONFIG);
id = Integer.parseInt(props.get(ID_CONFIG));
topic = props.get(TOPIC_CONFIG);
throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
} catch (NumberFormatException e) {
throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
}
partition = Collections.singletonMap(ID_FIELD, id);
Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(partition);
if (previousOffset != null)
seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
else
seqno = 0;
startingSeqno = seqno;
throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
long sendStartMs = System.currentTimeMillis();
if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs))
throttler.throttle();
long nowMs = System.currentTimeMillis();
Map<String, Object> data = new HashMap<>();
data.put("name", name);
data.put("task", id);
data.put("topic", this.topic);
data.put("time_ms", nowMs);
data.put("seqno", seqno);
String dataJson;
try {
dataJson = JSON_SERDE.writeValueAsString(data);
} catch (JsonProcessingException e) {
dataJson = "Bad data can't be written as json: " + e.getMessage();
}
System.out.println(dataJson);
Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, Schema.INT64_SCHEMA, seqno);
List<SourceRecord> result = Arrays.asList(srcRecord);
seqno++;
return result;
}
@Override
public void stop() {
throttler.wakeup();
}
}

View File

@ -15,4 +15,4 @@
apply from: file('scala.gradle')
include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
'connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools'

View File

@ -18,14 +18,30 @@ from ducktape.utils.util import wait_until
from ducktape.errors import DucktapeError
from kafkatest.services.kafka.directory import kafka_dir
import signal, random, requests
import signal, random, requests, os.path, json
class ConnectServiceBase(Service):
"""Base class for Kafka Connect services providing some common settings and functionality"""
PERSISTENT_ROOT = "/mnt/connect"
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect.properties")
# The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
# so they can be used for other output, e.g. verifiable source & sink.
LOG_FILE = os.path.join(PERSISTENT_ROOT, "connect.log")
STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "connect.stdout")
STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr")
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
logs = {
"kafka_log": {
"path": "/mnt/connect.log",
"connect_log": {
"path": LOG_FILE,
"collect_default": True},
"connect_stdout": {
"path": STDOUT_FILE,
"collect_default": False},
"connect_stderr": {
"path": STDERR_FILE,
"collect_default": True},
}
@ -37,7 +53,7 @@ class ConnectServiceBase(Service):
def pids(self, node):
"""Return process ids for Kafka Connect processes."""
try:
return [pid for pid in node.account.ssh_capture("cat /mnt/connect.pid", callback=int)]
return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
except:
return []
@ -52,33 +68,31 @@ class ConnectServiceBase(Service):
self.connector_config_templates = connector_config_templates
def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig, allow_fail=False)
for pid in pids:
wait_until(lambda: not node.account.alive(pid), timeout_sec=10, err_msg="Kafka Connect standalone process took too long to exit")
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
for pid in pids:
wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit")
node.account.ssh("rm -f /mnt/connect.pid", allow_fail=False)
node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
def restart(self):
# We don't want to do any clean up here, just restart the process.
for node in self.nodes:
self.logger.info("Restarting Kafka Connect on " + str(node.account))
self.stop_node(node)
self.start_node(node)
def clean_node(self, node):
if len(self.pids(node)) > 0:
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
for pid in self.pids(node):
node.account.signal(pid, signal.SIGKILL, allow_fail=False)
node.account.ssh("rm -rf /mnt/connect.pid /mnt/connect.log /mnt/connect.properties " + " ".join(self.config_filenames() + self.files), allow_fail=False)
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)
def config_filenames(self):
return ["/mnt/connect-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates or [])]
return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])]
def list_connectors(self, node=None):
@ -112,6 +126,7 @@ class ConnectServiceBase(Service):
meth = getattr(requests, method.lower())
url = self._base_url(node) + path
self.logger.debug("Kafka Connect REST request: %s %s %s %s", node.account.hostname, url, method, body)
resp = meth(url, json=body)
self.logger.debug("%s %s response: %d", url, method, resp.status_code)
if resp.status_code > 400:
@ -137,19 +152,23 @@ class ConnectStandaloneService(ConnectServiceBase):
return self.nodes[0]
def start_node(self, node):
node.account.create_file("/mnt/connect.properties", self.config_template_func(node))
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
remote_connector_configs = []
for idx, template in enumerate(self.connector_config_templates):
target_file = "/mnt/connect-connector-" + str(idx) + ".properties"
target_file = os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties")
node.account.create_file(target_file, template)
remote_connector_configs.append(target_file)
self.logger.info("Starting Kafka Connect standalone process")
with node.account.monitor_log("/mnt/connect.log") as monitor:
node.account.ssh("/opt/%s/bin/connect-standalone.sh /mnt/connect.properties " % kafka_dir(node) +
self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
node.account.ssh("( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE +
"/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) +
" ".join(remote_connector_configs) +
" 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid")
monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup")
(" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)))
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
@ -164,16 +183,20 @@ class ConnectDistributedService(ConnectServiceBase):
self.configs_topic = configs_topic
def start_node(self, node):
node.account.create_file("/mnt/connect.properties", self.config_template_func(node))
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
if self.connector_config_templates:
raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API")
self.logger.info("Starting Kafka Connect distributed process")
with node.account.monitor_log("/mnt/connect.log") as monitor:
cmd = "/opt/%s/bin/connect-distributed.sh /mnt/connect.properties " % kafka_dir(node)
cmd += " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid"
self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
node.account.ssh(cmd)
monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup")
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
@ -188,4 +211,75 @@ class ConnectRestError(RuntimeError):
self.url = url
def __unicode__(self):
return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message
return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message
class VerifiableConnector(object):
def messages(self):
"""
Collect and parse the logs from Kafka Connect nodes. Return a list containing all parsed JSON messages generated by
this source.
"""
self.logger.info("Collecting messages from log of %s %s", type(self).__name__, self.name)
records = []
for node in self.cc.nodes:
for line in node.account.ssh_capture('cat ' + self.cc.STDOUT_FILE):
try:
data = json.loads(line)
except ValueError:
self.logger.debug("Ignoring unparseable line: %s", line)
continue
# Filter to only ones matching our name to support multiple verifiable producers
if data['name'] != self.name: continue
data['node'] = node
records.append(data)
return records
def stop(self):
self.logger.info("Destroying connector %s %s", type(self).__name__, self.name)
self.cc.delete_connector(self.name)
class VerifiableSource(VerifiableConnector):
"""
Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output.
"""
def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000):
self.cc = cc
self.logger = self.cc.logger
self.name = name
self.tasks = tasks
self.topic = topic
self.throughput = throughput
def start(self):
self.logger.info("Creating connector VerifiableSourceConnector %s", self.name)
self.cc.create_connector({
'name': self.name,
'connector.class': 'org.apache.kafka.connect.tools.VerifiableSourceConnector',
'tasks.max': self.tasks,
'topic': self.topic,
'throughput': self.throughput
})
class VerifiableSink(VerifiableConnector):
"""
Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output.
"""
def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]):
self.cc = cc
self.logger = self.cc.logger
self.name = name
self.tasks = tasks
self.topics = topics
def start(self):
self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)
self.cc.create_connector({
'name': self.name,
'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector',
'tasks.max': self.tasks,
'topics': ",".join(self.topics)
})

View File

@ -100,7 +100,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
}
def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None,
from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer",
print_key=False, jmx_object_names=None, jmx_attributes=[]):
"""
Args:
context: standard context
@ -114,6 +115,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
successively consumed messages exceeds this timeout. Setting this and
waiting for the consumer to stop is a pretty good way to consume all messages
in a topic.
print_key if True, print each message's key in addition to its value
"""
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
BackgroundThreadService.__init__(self, context, num_nodes)
@ -131,7 +133,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.client_id = client_id
self.print_key = print_key
def prop_file(self, node):
"""Return a string which can be used to create a configuration file appropriate for the given node."""
@ -184,6 +186,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
if node.version > LATEST_0_8_2:
cmd += " --timeout-ms %s" % self.consumer_timeout_ms
if self.print_key:
cmd += " --property print.key=true"
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd

View File

@ -29,10 +29,13 @@ import re
import signal
import subprocess
import time
import os.path
class KafkaService(JmxMixin, Service):
PERSISTENT_ROOT = "/mnt"
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties")
logs = {
"kafka_log": {
"path": "/mnt/kafka.log",
@ -104,6 +107,7 @@ class KafkaService(JmxMixin, Service):
def start_cmd(self, node):
cmd = "export JMX_PORT=%d; " % self.jmx_port
cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; "
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
@ -114,6 +118,7 @@ class KafkaService(JmxMixin, Service):
self.logger.info("kafka.properties:")
self.logger.info(prop_file)
node.account.create_file("/mnt/kafka.properties", prop_file)
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('log4j.properties'))
self.security_config.setup_node(node)

View File

@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=INFO, kafkaAppender
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false
log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false
log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false
#Change this to debug to get the actual audit log for authorizer.
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

View File

@ -0,0 +1,30 @@
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# Define the root logger with appender file
log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File={{ log_file }}
log4j.appender.FILE.ImmediateFlush=true
log4j.appender.FILE.Threshold=debug
log4j.appender.FILE.Append=true
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR

View File

@ -14,11 +14,13 @@
# limitations under the License.
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.connect import ConnectDistributedService
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
from kafkatest.services.console_consumer import ConsoleConsumer
from ducktape.utils.util import wait_until
import hashlib, subprocess, json, itertools
import hashlib, subprocess, json, itertools, time
from collections import Counter
class ConnectDistributedFileTest(KafkaTest):
class ConnectDistributedTest(KafkaTest):
"""
Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
another, validating the total output is identical to the input.
@ -41,18 +43,21 @@ class ConnectDistributedFileTest(KafkaTest):
SCHEMA = { "type": "string", "optional": False }
def __init__(self, test_context):
super(ConnectDistributedFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
'test' : { 'partitions': 1, 'replication-factor': 1 }
})
self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
self.cc.log_level = "DEBUG"
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
self.schemas = True
def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
assert converter != None, "converter type must be set"
# Template parameters
self.key_converter = converter
self.value_converter = converter
self.schemas = schemas
def test_file_source_and_sink(self):
"""
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
"""
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@ -68,7 +73,7 @@ class ConnectDistributedFileTest(KafkaTest):
# do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
for node in self.cc.nodes:
node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=70, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
# Restarting both should result in them picking up where they left off,
# only processing new data.
@ -76,19 +81,113 @@ class ConnectDistributedFileTest(KafkaTest):
for node in self.cc.nodes:
node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file")
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
def validate_output(self, input):
def test_clean_bounce(self):
"""
Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces.
"""
num_tasks = 3
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()
self.source = VerifiableSource(self.cc, tasks=num_tasks)
self.source.start()
self.sink = VerifiableSink(self.cc, tasks=num_tasks)
self.sink.start()
for _ in range(3):
for node in self.cc.nodes:
started = time.time()
self.logger.info("Cleanly bouncing Kafka Connect on " + str(node.account))
self.cc.stop_node(node)
with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
self.cc.start_node(node)
monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
err_msg="Kafka Connect worker didn't successfully join group and start work")
self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
self.source.stop()
self.sink.stop()
self.cc.stop()
# Validate at least once delivery of everything that was reported as written since we should have flushed and
# cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed
# all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across
# tasks.
src_msgs = self.source.messages()
sink_msgs = self.sink.messages()
success = True
errors = []
for task in range(num_tasks):
src_seqnos = [msg['seqno'] for msg in src_msgs if msg['task'] == task]
# Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean
# bouncing should commit on rebalance.
src_seqno_max = max(src_seqnos)
self.logger.debug("Max source seqno: %d", src_seqno_max)
src_seqno_counts = Counter(src_seqnos)
missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos)))
duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1])
if missing_src_seqnos:
self.logger.error("Missing source sequence numbers for task " + str(task))
errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos))
success = False
if duplicate_src_seqnos:
self.logger.error("Duplicate source sequence numbers for task " + str(task))
errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos))
success = False
sink_seqnos = [msg['seqno'] for msg in sink_msgs if msg['task'] == task and 'flushed' in msg]
# Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because
# clean bouncing should commit on rebalance.
sink_seqno_max = max(sink_seqnos)
self.logger.debug("Max sink seqno: %d", sink_seqno_max)
sink_seqno_counts = Counter(sink_seqnos)
missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos)))
duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1])
if missing_sink_seqnos:
self.logger.error("Missing sink sequence numbers for task " + str(task))
errors.append("Found missing sink sequence numbers for task %d: %s" % (task, missing_sink_seqnos))
success = False
if duplicate_sink_seqnos:
self.logger.error("Duplicate sink sequence numbers for task " + str(task))
errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos))
success = False
if sink_seqno_max > src_seqno_max:
self.logger.error("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max)
errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max))
success = False
if src_seqno_max < 1000 or sink_seqno_max < 1000:
errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max, sink_seqno_max))
success = False
if not success:
self.mark_for_collect(self.cc)
# Also collect the data in the topic to aid in debugging
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True)
consumer_validator.run()
self.mark_for_collect(consumer_validator, "consumer_stdout")
assert success, "Found validation errors:\n" + "\n ".join(errors)
def _validate_file_output(self, input):
input_set = set(input)
# Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
# Between the first and second rounds, we might even end up with half the data on each node.
output_set = set(itertools.chain(*[
[line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
[line.strip() for line in self._file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
]))
return input_set == output_set
def file_contents(self, node, file):
def _file_contents(self, node, file):
try:
# Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
# immediately

View File

@ -24,6 +24,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.utils.ThroughputThrottler;
public class ProducerPerformance {

View File

@ -41,6 +41,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.ThroughputThrottler;
/**
* Primarily intended for use with system testing, this producer prints metadata