From 8db55618d5d5d5de97feab2bf8da4dc45387a76a Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 10 Nov 2015 14:54:15 -0800 Subject: [PATCH] KAFKA-2752: Add VerifiableSource/Sink connectors and rolling bounce Copycat system tests. Author: Ewen Cheslack-Postava Reviewers: Ben Stopford, Geoff Anderson, Guozhang Wang Closes #432 from ewencp/kafka-2752-copycat-clean-bounce-test --- bin/kafka-run-class.sh | 2 +- build.gradle | 65 +++++++- checkstyle/import-control.xml | 4 + .../common/utils}/ThroughputThrottler.java | 36 +++- .../kafka/connect/source/SourceRecord.java | 5 + .../kafka/connect/runtime/WorkerSinkTask.java | 7 + .../connect/runtime/WorkerSinkTaskThread.java | 6 +- .../connect/runtime/WorkerSourceTask.java | 4 +- .../connect/runtime/rest/RestServer.java | 4 + .../rest/resources/ConnectorsResource.java | 8 +- .../kafka/connect/util/KafkaBasedLog.java | 6 + .../tools/VerifiableSinkConnector.java | 64 ++++++++ .../connect/tools/VerifiableSinkTask.java | 110 +++++++++++++ .../tools/VerifiableSourceConnector.java | 64 ++++++++ .../connect/tools/VerifiableSourceTask.java | 128 +++++++++++++++ settings.gradle | 2 +- tests/kafkatest/services/connect.py | 154 ++++++++++++++---- tests/kafkatest/services/console_consumer.py | 9 +- tests/kafkatest/services/kafka/kafka.py | 7 +- .../services/kafka/templates/log4j.properties | 87 ++++++++++ .../templates/connect_log4j.properties | 30 ++++ .../tests/connect_distributed_test.py | 133 +++++++++++++-- .../kafka/tools/ProducerPerformance.java | 1 + .../kafka/tools/VerifiableProducer.java | 1 + 24 files changed, 873 insertions(+), 64 deletions(-) rename {tools/src/main/java/org/apache/kafka/tools => clients/src/main/java/org/apache/kafka/common/utils}/ThroughputThrottler.java (78%) create mode 100644 connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java create mode 100644 connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java create mode 100644 connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java create mode 100644 connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java create mode 100644 tests/kafkatest/services/kafka/templates/log4j.properties create mode 100644 tests/kafkatest/services/templates/connect_log4j.properties diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index b18a9cfbadd..9962e377fed 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -72,7 +72,7 @@ do CLASSPATH=$CLASSPATH:$dir/* done -for cc_pkg in "api" "runtime" "file" "json" +for cc_pkg in "api" "runtime" "file" "json" "tools" do for file in $base_dir/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar; do diff --git a/build.gradle b/build.gradle index 7f21a00b9b4..70fdbcd8d08 100644 --- a/build.gradle +++ b/build.gradle @@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } } -def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file'] +def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools'] def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} @@ -350,6 +350,8 @@ project(':core') { from(project(':connect:json').configurations.runtime) { into("libs/") } from(project(':connect:file').jar) { into("libs/") } from(project(':connect:file').configurations.runtime) { into("libs/") } + from(project(':connect:tools').jar) { into("libs/") } + from(project(':connect:tools').configurations.runtime) { into("libs/") } } jar { @@ -887,3 +889,64 @@ project(':connect:file') { } test.dependsOn('checkstyleMain', 'checkstyleTest') } + +project(':connect:tools') { + apply plugin: 'checkstyle' + archivesBaseName = "connect-tools" + + dependencies { + compile project(':connect:api') + compile "$slf4japi" + compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" + + testCompile "$junit" + testCompile "$easymock" + testCompile "$powermock" + testCompile "$powermock_easymock" + testRuntime "$slf4jlog4j" + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/connect/*" + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + } + from (configurations.runtime) { + exclude('kafka-clients*') + exclude('connect-*') + } + into "$buildDir/dependant-libs" + } + + jar { + dependsOn copyDependantLibs + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom(testCompile) + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 908fd351d79..16a370092f3 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -205,6 +205,10 @@ + + + + diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java similarity index 78% rename from tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java rename to clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java index d8deb220066..1c63ffb08c5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.tools; +package org.apache.kafka.common.utils; /** @@ -46,6 +46,7 @@ public class ThroughputThrottler { long sleepDeficitNs = 0; long targetThroughput = -1; long startMs; + private boolean wakeup = false; /** * @param targetThroughput Can be messages/sec or bytes/sec @@ -83,7 +84,11 @@ public class ThroughputThrottler { public void throttle() { if (targetThroughput == 0) { try { - Thread.sleep(Long.MAX_VALUE); + synchronized (this) { + while (!wakeup) { + this.wait(); + } + } } catch (InterruptedException e) { // do nothing } @@ -96,12 +101,21 @@ public class ThroughputThrottler { // If enough sleep deficit has accumulated, sleep a little if (sleepDeficitNs >= MIN_SLEEP_NS) { - long sleepMs = sleepDeficitNs / 1000000; - long sleepNs = sleepDeficitNs - sleepMs * 1000000; - long sleepStartNs = System.nanoTime(); + long currentTimeNs = sleepStartNs; try { - Thread.sleep(sleepMs, (int) sleepNs); + synchronized (this) { + long elapsed = currentTimeNs - sleepStartNs; + long remaining = sleepDeficitNs - elapsed; + while (!wakeup && remaining > 0) { + long sleepMs = remaining / 1000000; + long sleepNs = remaining - sleepMs * 1000000; + this.wait(sleepMs, (int) sleepNs); + elapsed = System.nanoTime() - sleepStartNs; + remaining = sleepDeficitNs - elapsed; + } + wakeup = false; + } sleepDeficitNs = 0; } catch (InterruptedException e) { // If sleep is cut short, reduce deficit by the amount of @@ -113,5 +127,15 @@ public class ThroughputThrottler { } } } + + /** + * Wakeup the throttler if its sleeping. + */ + public void wakeup() { + synchronized (this) { + wakeup = true; + this.notifyAll(); + } + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java index 18900624807..b2b29bf7086 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java @@ -56,6 +56,11 @@ public class SourceRecord extends ConnectRecord { this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value); } + public SourceRecord(Map sourcePartition, Map sourceOffset, + String topic, Schema keySchema, Object key, Schema valueSchema, Object value) { + this(sourcePartition, sourceOffset, topic, null, keySchema, key, valueSchema, value); + } + public SourceRecord(Map sourcePartition, Map sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index e0a3e04b639..686e56434e0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -230,6 +230,13 @@ class WorkerSinkTask implements WorkerTask { return workerConfig; } + @Override + public String toString() { + return "WorkerSinkTask{" + + "id=" + id + + '}'; + } + private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java index e776f083e0b..b65efa86da9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java @@ -80,7 +80,7 @@ class WorkerSinkTaskThread extends ShutdownableThread { long commitTimeout = commitStarted + task.workerConfig().getLong( WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); if (committing && now >= commitTimeout) { - log.warn("Commit of {} offsets timed out", this); + log.warn("Commit of {} offsets timed out", task); commitFailures++; committing = false; } @@ -98,11 +98,11 @@ class WorkerSinkTaskThread extends ShutdownableThread { seqno, commitSeqno); } else { if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", this, error); + log.error("Commit of {} offsets threw an unexpected exception: ", task, error); commitFailures++; } else { log.debug("Finished {} offset commit successfully in {} ms", - this, task.time().milliseconds() - commitStarted); + task, task.time().milliseconds() - commitStarted); commitFailures = 0; } committing = false; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 6cf1dd716a2..5d0b7e7bec8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -178,6 +178,8 @@ class WorkerSourceTask implements WorkerTask { public boolean commitOffsets() { long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + log.debug("{} Committing offsets", this); + long started = time.milliseconds(); long timeout = started + commitTimeoutMs; @@ -259,7 +261,7 @@ class WorkerSourceTask implements WorkerTask { } finishSuccessfulFlush(); - log.debug("Finished {} commitOffsets successfully in {} ms", + log.info("Finished {} commitOffsets successfully in {} ms", this, time.milliseconds() - started); return true; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 96346ad4610..a544fb00152 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -133,6 +133,8 @@ public class RestServer { } public void stop() { + log.info("Stopping REST server"); + try { jettyServer.stop(); jettyServer.join(); @@ -141,6 +143,8 @@ public class RestServer { } finally { jettyServer.destroy(); } + + log.info("REST server stopped"); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index cea436047b9..c95b7237c25 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -27,6 +27,8 @@ import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.ServletContext; import javax.ws.rs.Consumes; @@ -51,6 +53,8 @@ import java.util.concurrent.TimeoutException; @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public class ConnectorsResource { + private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class); + // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, @@ -159,7 +163,9 @@ public class ConnectorsResource { } catch (ExecutionException e) { if (e.getCause() instanceof NotLeaderException) { NotLeaderException notLeaderError = (NotLeaderException) e.getCause(); - return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType)); + String forwardUrl = RestServer.urlJoin(notLeaderError.leaderUrl(), path); + log.debug("Forwarding request to leader: {} {} {}", forwardUrl, method, body); + return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType)); } throw e.getCause(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 3b37076cdcf..c82645cd936 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -189,6 +189,7 @@ public class KafkaBasedLog { * @param callback the callback to invoke once the end of the log has been reached. */ public void readToEnd(Callback callback) { + log.trace("Starting read to end log for topic {}", topic); producer.flush(); synchronized (this) { readLogEndOffsetCallbacks.add(callback); @@ -286,6 +287,10 @@ public class KafkaBasedLog { private class WorkThread extends Thread { + public WorkThread() { + super("KafkaBasedLog Work Thread - " + topic); + } + @Override public void run() { try { @@ -300,6 +305,7 @@ public class KafkaBasedLog { if (numCallbacks > 0) { try { readToLogEnd(); + log.trace("Finished read to end log for topic {}", topic); } catch (WakeupException e) { // Either received another get() call and need to retry reading to end of log or stop() was // called. Both are handled by restarting this loop. diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java new file mode 100644 index 00000000000..0ab64fdfa26 --- /dev/null +++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @see VerifiableSinkTask + */ +public class VerifiableSinkConnector extends SourceConnector { + private Map config; + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + @Override + public void start(Map props) { + this.config = props; + } + + @Override + public Class taskClass() { + return VerifiableSinkTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + ArrayList> configs = new ArrayList<>(); + for (Integer i = 0; i < maxTasks; i++) { + Map props = new HashMap<>(config); + props.put(VerifiableSinkTask.ID_CONFIG, i.toString()); + configs.add(props); + } + return configs; + } + + @Override + public void stop() { + } +} diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java new file mode 100644 index 00000000000..16464a1b789 --- /dev/null +++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Counterpart to {@link VerifiableSourceTask} that consumes records and logs information about each to stdout. This + * allows validation of processing of messages by sink tasks on distributed workers even in the face of worker restarts + * and failures. This task relies on the offset management provided by the Kafka Connect framework and therefore can detect + * bugs in its implementation. + */ +public class VerifiableSinkTask extends SinkTask { + public static final String NAME_CONFIG = "name"; + public static final String ID_CONFIG = "id"; + + private static final ObjectMapper JSON_SERDE = new ObjectMapper(); + + private String name; // Connector name + private int id; // Task ID + + private ArrayList> unflushed = new ArrayList<>(); + + @Override + public String version() { + return new VerifiableSinkConnector().version(); + } + + @Override + public void start(Map props) { + try { + name = props.get(NAME_CONFIG); + id = Integer.parseInt(props.get(ID_CONFIG)); + } catch (NumberFormatException e) { + throw new ConnectException("Invalid VerifiableSourceTask configuration", e); + } + } + + @Override + public void put(Collection records) { + long nowMs = System.currentTimeMillis(); + for (SinkRecord record : records) { + Map data = new HashMap<>(); + data.put("name", name); + data.put("task", record.key()); // VerifiableSourceTask's input task (source partition) + data.put("sinkTask", id); + data.put("topic", record.topic()); + data.put("time_ms", nowMs); + data.put("seqno", record.value()); + data.put("offset", record.kafkaOffset()); + String dataJson; + try { + dataJson = JSON_SERDE.writeValueAsString(data); + } catch (JsonProcessingException e) { + dataJson = "Bad data can't be written as json: " + e.getMessage(); + } + System.out.println(dataJson); + unflushed.add(data); + } + } + + @Override + public void flush(Map offsets) { + long nowMs = System.currentTimeMillis(); + for (Map data : unflushed) { + data.put("time_ms", nowMs); + data.put("flushed", true); + String dataJson; + try { + dataJson = JSON_SERDE.writeValueAsString(data); + } catch (JsonProcessingException e) { + dataJson = "Bad data can't be written as json: " + e.getMessage(); + } + System.out.println(dataJson); + } + unflushed.clear(); + } + + @Override + public void stop() { + + } +} diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java new file mode 100644 index 00000000000..5f9afd5de70 --- /dev/null +++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @see VerifiableSourceTask + */ +public class VerifiableSourceConnector extends SourceConnector { + private Map config; + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + @Override + public void start(Map props) { + this.config = props; + } + + @Override + public Class taskClass() { + return VerifiableSourceTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + ArrayList> configs = new ArrayList<>(); + for (Integer i = 0; i < maxTasks; i++) { + Map props = new HashMap<>(config); + props.put(VerifiableSourceTask.ID_CONFIG, i.toString()); + configs.add(props); + } + return configs; + } + + @Override + public void stop() { + } +} diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java new file mode 100644 index 00000000000..6fee2c4068c --- /dev/null +++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.utils.ThroughputThrottler; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A connector primarily intended for system tests. The connector simply generates as many tasks as requested. The + * tasks print metadata in the form of JSON to stdout for each message generated, making externally visible which + * messages have been sent. Each message is also assigned a unique, increasing seqno that is passed to Kafka Connect; when + * tasks are started on new nodes, this seqno is used to resume where the task previously left off, allowing for + * testing of distributed Kafka Connect. + * + * If logging is left enabled, log output on stdout can be easily ignored by checking whether a given line is valid JSON. + */ +public class VerifiableSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(VerifiableSourceTask.class); + + public static final String NAME_CONFIG = "name"; + public static final String ID_CONFIG = "id"; + public static final String TOPIC_CONFIG = "topic"; + public static final String THROUGHPUT_CONFIG = "throughput"; + + private static final String ID_FIELD = "id"; + private static final String SEQNO_FIELD = "seqno"; + + private static final ObjectMapper JSON_SERDE = new ObjectMapper(); + + private String name; // Connector name + private int id; // Task ID + private String topic; + private Map partition; + private long startingSeqno; + private long seqno; + private ThroughputThrottler throttler; + + @Override + public String version() { + return new VerifiableSourceConnector().version(); + } + + @Override + public void start(Map props) { + final long throughput; + try { + name = props.get(NAME_CONFIG); + id = Integer.parseInt(props.get(ID_CONFIG)); + topic = props.get(TOPIC_CONFIG); + throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG)); + } catch (NumberFormatException e) { + throw new ConnectException("Invalid VerifiableSourceTask configuration", e); + } + + partition = Collections.singletonMap(ID_FIELD, id); + Map previousOffset = this.context.offsetStorageReader().offset(partition); + if (previousOffset != null) + seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1; + else + seqno = 0; + startingSeqno = seqno; + throttler = new ThroughputThrottler(throughput, System.currentTimeMillis()); + + log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno); + } + + @Override + public List poll() throws InterruptedException { + long sendStartMs = System.currentTimeMillis(); + if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs)) + throttler.throttle(); + + long nowMs = System.currentTimeMillis(); + + Map data = new HashMap<>(); + data.put("name", name); + data.put("task", id); + data.put("topic", this.topic); + data.put("time_ms", nowMs); + data.put("seqno", seqno); + String dataJson; + try { + dataJson = JSON_SERDE.writeValueAsString(data); + } catch (JsonProcessingException e) { + dataJson = "Bad data can't be written as json: " + e.getMessage(); + } + System.out.println(dataJson); + + Map ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno); + SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, Schema.INT64_SCHEMA, seqno); + List result = Arrays.asList(srcRecord); + seqno++; + return result; + } + + @Override + public void stop() { + throttler.wakeup(); + } +} diff --git a/settings.gradle b/settings.gradle index 3d69facd64c..2728b5b4409 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,4 +15,4 @@ apply from: file('scala.gradle') include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', - 'connect:api', 'connect:runtime', 'connect:json', 'connect:file' + 'connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools' diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index fbac5656082..de593ea851d 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -18,14 +18,30 @@ from ducktape.utils.util import wait_until from ducktape.errors import DucktapeError from kafkatest.services.kafka.directory import kafka_dir -import signal, random, requests +import signal, random, requests, os.path, json class ConnectServiceBase(Service): """Base class for Kafka Connect services providing some common settings and functionality""" + PERSISTENT_ROOT = "/mnt/connect" + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect.properties") + # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately + # so they can be used for other output, e.g. verifiable source & sink. + LOG_FILE = os.path.join(PERSISTENT_ROOT, "connect.log") + STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "connect.stdout") + STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr") + LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties") + PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid") + logs = { - "kafka_log": { - "path": "/mnt/connect.log", + "connect_log": { + "path": LOG_FILE, + "collect_default": True}, + "connect_stdout": { + "path": STDOUT_FILE, + "collect_default": False}, + "connect_stderr": { + "path": STDERR_FILE, "collect_default": True}, } @@ -37,7 +53,7 @@ class ConnectServiceBase(Service): def pids(self, node): """Return process ids for Kafka Connect processes.""" try: - return [pid for pid in node.account.ssh_capture("cat /mnt/connect.pid", callback=int)] + return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)] except: return [] @@ -52,33 +68,31 @@ class ConnectServiceBase(Service): self.connector_config_templates = connector_config_templates def stop_node(self, node, clean_shutdown=True): + self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account)) pids = self.pids(node) sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL for pid in pids: - node.account.signal(pid, sig, allow_fail=False) - for pid in pids: - wait_until(lambda: not node.account.alive(pid), timeout_sec=10, err_msg="Kafka Connect standalone process took too long to exit") + node.account.signal(pid, sig, allow_fail=True) + if clean_shutdown: + for pid in pids: + wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit") - node.account.ssh("rm -f /mnt/connect.pid", allow_fail=False) + node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False) def restart(self): # We don't want to do any clean up here, just restart the process. for node in self.nodes: + self.logger.info("Restarting Kafka Connect on " + str(node.account)) self.stop_node(node) self.start_node(node) def clean_node(self, node): - if len(self.pids(node)) > 0: - self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % - (self.__class__.__name__, node.account)) - for pid in self.pids(node): - node.account.signal(pid, signal.SIGKILL, allow_fail=False) - - node.account.ssh("rm -rf /mnt/connect.pid /mnt/connect.log /mnt/connect.properties " + " ".join(self.config_filenames() + self.files), allow_fail=False) + node.account.kill_process("connect", clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False) def config_filenames(self): - return ["/mnt/connect-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates or [])] + return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] def list_connectors(self, node=None): @@ -112,6 +126,7 @@ class ConnectServiceBase(Service): meth = getattr(requests, method.lower()) url = self._base_url(node) + path + self.logger.debug("Kafka Connect REST request: %s %s %s %s", node.account.hostname, url, method, body) resp = meth(url, json=body) self.logger.debug("%s %s response: %d", url, method, resp.status_code) if resp.status_code > 400: @@ -137,19 +152,23 @@ class ConnectStandaloneService(ConnectServiceBase): return self.nodes[0] def start_node(self, node): - node.account.create_file("/mnt/connect.properties", self.config_template_func(node)) + node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + + node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE)) remote_connector_configs = [] for idx, template in enumerate(self.connector_config_templates): - target_file = "/mnt/connect-connector-" + str(idx) + ".properties" + target_file = os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") node.account.create_file(target_file, template) remote_connector_configs.append(target_file) - self.logger.info("Starting Kafka Connect standalone process") - with node.account.monitor_log("/mnt/connect.log") as monitor: - node.account.ssh("/opt/%s/bin/connect-standalone.sh /mnt/connect.properties " % kafka_dir(node) + + self.logger.info("Starting Kafka Connect standalone process on " + str(node.account)) + with node.account.monitor_log(self.LOG_FILE) as monitor: + node.account.ssh("( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + + "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) + " ".join(remote_connector_configs) + - " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid") - monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup") + (" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE))) + monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") @@ -164,16 +183,20 @@ class ConnectDistributedService(ConnectServiceBase): self.configs_topic = configs_topic def start_node(self, node): - node.account.create_file("/mnt/connect.properties", self.config_template_func(node)) + node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + + node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE)) if self.connector_config_templates: raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API") - self.logger.info("Starting Kafka Connect distributed process") - with node.account.monitor_log("/mnt/connect.log") as monitor: - cmd = "/opt/%s/bin/connect-distributed.sh /mnt/connect.properties " % kafka_dir(node) - cmd += " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid" + self.logger.info("Starting Kafka Connect distributed process on " + str(node.account)) + with node.account.monitor_log(self.LOG_FILE) as monitor: + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE) + cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE) node.account.ssh(cmd) - monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup") + monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") @@ -188,4 +211,75 @@ class ConnectRestError(RuntimeError): self.url = url def __unicode__(self): - return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message \ No newline at end of file + return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message + + + +class VerifiableConnector(object): + def messages(self): + """ + Collect and parse the logs from Kafka Connect nodes. Return a list containing all parsed JSON messages generated by + this source. + """ + self.logger.info("Collecting messages from log of %s %s", type(self).__name__, self.name) + records = [] + for node in self.cc.nodes: + for line in node.account.ssh_capture('cat ' + self.cc.STDOUT_FILE): + try: + data = json.loads(line) + except ValueError: + self.logger.debug("Ignoring unparseable line: %s", line) + continue + # Filter to only ones matching our name to support multiple verifiable producers + if data['name'] != self.name: continue + data['node'] = node + records.append(data) + return records + + def stop(self): + self.logger.info("Destroying connector %s %s", type(self).__name__, self.name) + self.cc.delete_connector(self.name) + +class VerifiableSource(VerifiableConnector): + """ + Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output. + """ + + def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000): + self.cc = cc + self.logger = self.cc.logger + self.name = name + self.tasks = tasks + self.topic = topic + self.throughput = throughput + + def start(self): + self.logger.info("Creating connector VerifiableSourceConnector %s", self.name) + self.cc.create_connector({ + 'name': self.name, + 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSourceConnector', + 'tasks.max': self.tasks, + 'topic': self.topic, + 'throughput': self.throughput + }) + +class VerifiableSink(VerifiableConnector): + """ + Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output. + """ + + def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]): + self.cc = cc + self.logger = self.cc.logger + self.name = name + self.tasks = tasks + self.topics = topics + + def start(self): + self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) + self.cc.create_connector({ + 'name': self.name, + 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', + 'tasks.max': self.tasks, + 'topics': ",".join(self.topics) + }) diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 18021d97f99..84d358d5a05 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -100,7 +100,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): } def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None, - from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]): + from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", + print_key=False, jmx_object_names=None, jmx_attributes=[]): """ Args: context: standard context @@ -114,6 +115,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): successively consumed messages exceeds this timeout. Setting this and waiting for the consumer to stop is a pretty good way to consume all messages in a topic. + print_key if True, print each message's key in addition to its value """ JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes) BackgroundThreadService.__init__(self, context, num_nodes) @@ -131,7 +133,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} self.client_id = client_id - + self.print_key = print_key def prop_file(self, node): """Return a string which can be used to create a configuration file appropriate for the given node.""" @@ -184,6 +186,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): if node.version > LATEST_0_8_2: cmd += " --timeout-ms %s" % self.consumer_timeout_ms + if self.print_key: + cmd += " --property print.key=true" + cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 27530f0c4ff..a7a1581fde7 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -29,10 +29,13 @@ import re import signal import subprocess import time - +import os.path class KafkaService(JmxMixin, Service): + PERSISTENT_ROOT = "/mnt" + LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties") + logs = { "kafka_log": { "path": "/mnt/kafka.log", @@ -104,6 +107,7 @@ class KafkaService(JmxMixin, Service): def start_cmd(self, node): cmd = "export JMX_PORT=%d; " % self.jmx_port + cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; " cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &" @@ -114,6 +118,7 @@ class KafkaService(JmxMixin, Service): self.logger.info("kafka.properties:") self.logger.info(prop_file) node.account.create_file("/mnt/kafka.properties", prop_file) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('log4j.properties')) self.security_config.setup_node(node) diff --git a/tests/kafkatest/services/kafka/templates/log4j.properties b/tests/kafkatest/services/kafka/templates/log4j.properties new file mode 100644 index 00000000000..bf816e76d2f --- /dev/null +++ b/tests/kafkatest/services/kafka/templates/log4j.properties @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log +log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false + +#Change this to debug to get the actual audit log for authorizer. +log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender +log4j.additivity.kafka.authorizer.logger=false + diff --git a/tests/kafkatest/services/templates/connect_log4j.properties b/tests/kafkatest/services/templates/connect_log4j.properties new file mode 100644 index 00000000000..d62a93d81a8 --- /dev/null +++ b/tests/kafkatest/services/templates/connect_log4j.properties @@ -0,0 +1,30 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +# Define the root logger with appender file +log4j.rootLogger = {{ log_level|default("INFO") }}, FILE + +log4j.appender.FILE=org.apache.log4j.FileAppender +log4j.appender.FILE.File={{ log_file }} +log4j.appender.FILE.ImmediateFlush=true +log4j.appender.FILE.Threshold=debug +log4j.appender.FILE.Append=true +log4j.appender.FILE.layout=org.apache.log4j.PatternLayout +log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n + +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.I0Itec.zkclient=ERROR diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py index 55901c2c94d..4689f3682f9 100644 --- a/tests/kafkatest/tests/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect_distributed_test.py @@ -14,11 +14,13 @@ # limitations under the License. from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.connect import ConnectDistributedService +from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink +from kafkatest.services.console_consumer import ConsoleConsumer from ducktape.utils.util import wait_until -import hashlib, subprocess, json, itertools +import hashlib, subprocess, json, itertools, time +from collections import Counter -class ConnectDistributedFileTest(KafkaTest): +class ConnectDistributedTest(KafkaTest): """ Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on another, validating the total output is identical to the input. @@ -41,18 +43,21 @@ class ConnectDistributedFileTest(KafkaTest): SCHEMA = { "type": "string", "optional": False } def __init__(self, test_context): - super(ConnectDistributedFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ 'test' : { 'partitions': 1, 'replication-factor': 1 } }) - self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) + self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) + self.cc.log_level = "DEBUG" + self.key_converter = "org.apache.kafka.connect.json.JsonConverter" + self.value_converter = "org.apache.kafka.connect.json.JsonConverter" + self.schemas = True - def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True): - assert converter != None, "converter type must be set" - # Template parameters - self.key_converter = converter - self.value_converter = converter - self.schemas = schemas + def test_file_source_and_sink(self): + """ + Tests that a basic file connector works across clean rolling bounces. This validates that the connector is + correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes. + """ self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) @@ -68,7 +73,7 @@ class ConnectDistributedFileTest(KafkaTest): # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile for node in self.cc.nodes: node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE) - wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") + wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=70, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") # Restarting both should result in them picking up where they left off, # only processing new data. @@ -76,19 +81,113 @@ class ConnectDistributedFileTest(KafkaTest): for node in self.cc.nodes: node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE) - wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file") + wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file") - def validate_output(self, input): + + def test_clean_bounce(self): + """ + Validates that source and sink tasks that run continuously and produce a predictable sequence of messages + run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces. + """ + num_tasks = 3 + + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + self.source = VerifiableSource(self.cc, tasks=num_tasks) + self.source.start() + self.sink = VerifiableSink(self.cc, tasks=num_tasks) + self.sink.start() + + for _ in range(3): + for node in self.cc.nodes: + started = time.time() + self.logger.info("Cleanly bouncing Kafka Connect on " + str(node.account)) + self.cc.stop_node(node) + with node.account.monitor_log(self.cc.LOG_FILE) as monitor: + self.cc.start_node(node) + monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, + err_msg="Kafka Connect worker didn't successfully join group and start work") + self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started) + + self.source.stop() + self.sink.stop() + self.cc.stop() + + # Validate at least once delivery of everything that was reported as written since we should have flushed and + # cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed + # all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across + # tasks. + src_msgs = self.source.messages() + sink_msgs = self.sink.messages() + success = True + errors = [] + for task in range(num_tasks): + src_seqnos = [msg['seqno'] for msg in src_msgs if msg['task'] == task] + # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean + # bouncing should commit on rebalance. + src_seqno_max = max(src_seqnos) + self.logger.debug("Max source seqno: %d", src_seqno_max) + src_seqno_counts = Counter(src_seqnos) + missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) + duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1]) + + if missing_src_seqnos: + self.logger.error("Missing source sequence numbers for task " + str(task)) + errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos)) + success = False + if duplicate_src_seqnos: + self.logger.error("Duplicate source sequence numbers for task " + str(task)) + errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos)) + success = False + + sink_seqnos = [msg['seqno'] for msg in sink_msgs if msg['task'] == task and 'flushed' in msg] + # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because + # clean bouncing should commit on rebalance. + sink_seqno_max = max(sink_seqnos) + self.logger.debug("Max sink seqno: %d", sink_seqno_max) + sink_seqno_counts = Counter(sink_seqnos) + missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) + duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1]) + + if missing_sink_seqnos: + self.logger.error("Missing sink sequence numbers for task " + str(task)) + errors.append("Found missing sink sequence numbers for task %d: %s" % (task, missing_sink_seqnos)) + success = False + if duplicate_sink_seqnos: + self.logger.error("Duplicate sink sequence numbers for task " + str(task)) + errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos)) + success = False + + + if sink_seqno_max > src_seqno_max: + self.logger.error("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max) + errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max)) + success = False + + if src_seqno_max < 1000 or sink_seqno_max < 1000: + errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max, sink_seqno_max)) + success = False + + if not success: + self.mark_for_collect(self.cc) + # Also collect the data in the topic to aid in debugging + consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) + consumer_validator.run() + self.mark_for_collect(consumer_validator, "consumer_stdout") + assert success, "Found validation errors:\n" + "\n ".join(errors) + + + def _validate_file_output(self, input): input_set = set(input) # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled. # Between the first and second rounds, we might even end up with half the data on each node. output_set = set(itertools.chain(*[ - [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes + [line.strip() for line in self._file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes ])) return input_set == output_set - - def file_contents(self, node, file): + def _file_contents(self, node, file): try: # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of # immediately diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 3a068626c5c..2a7f7b18e12 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -24,6 +24,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.utils.ThroughputThrottler; public class ProducerPerformance { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 0cd90c0bbf5..e8bd33025e1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -41,6 +41,7 @@ import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.ThroughputThrottler; /** * Primarily intended for use with system testing, this producer prints metadata