diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java deleted file mode 100644 index 15a9fa0465c..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.tests; - -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamsConfig; - -import java.io.IOException; -import java.util.Map; -import java.util.Properties; - -/** - * Utility class used to send messages with integer keys - * repeating in sequence every 1000 messages. Multiple topics for publishing - * can be provided in the config map with key of 'topics' and ';' delimited list of output topics - */ -public class StreamsRepeatingIntegerKeyProducer { - - private static volatile boolean keepProducing = true; - private volatile static int messageCounter = 0; - - public static void main(final String[] args) throws IOException { - if (args.length < 2) { - System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter"); - System.exit(1); - } - - System.out.println("StreamsTest instance started"); - - final String propFileName = args[0]; - final String configString = args[1]; - - final Properties streamsProperties = Utils.loadProps(propFileName); - final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); - - if (kafka == null) { - System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); - System.exit(1); - } - - final Map configs = SystemTestUtil.parseConfigs(configString); - System.out.println("Using provided configs " + configs); - - final int numMessages = configs.containsKey("num_messages") ? Integer.parseInt(configs.get("num_messages")) : 1000; - - final Properties producerProps = new Properties(); - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "StreamsRepeatingIntegerKeyProducer"); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - - final String value = "testingValue"; - Integer key = 0; - - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - keepProducing = false; - } - })); - - final String[] topics = configs.get("topics").split(";"); - final int totalMessagesToProduce = numMessages * topics.length; - - try (final KafkaProducer kafkaProducer = new KafkaProducer<>(producerProps)) { - - while (keepProducing && messageCounter < totalMessagesToProduce) { - for (final String topic : topics) { - final ProducerRecord producerRecord = new ProducerRecord<>(topic, key.toString(), value + key); - kafkaProducer.send(producerRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - if (exception instanceof TimeoutException) { - try { - // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time - final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); - messageCounter -= expired; - } catch (final Exception ignore) { - } - } - } - } - }); - messageCounter += 1; - } - key += 1; - if (key % 1000 == 0) { - System.out.println("Sent 1000 messages"); - Utils.sleep(100); - key = 0; - } - } - } - System.out.println("Producer shut down now, sent total " + messageCounter + " of requested " + totalMessagesToProduce); - System.out.flush(); - } -} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index ec6a08157f1..f268ab8de59 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -397,12 +397,6 @@ class StreamsStandbyTaskService(StreamsTestBaseService): configs) -class StreamsRepeatingIntegerKeyProducerService(StreamsTestBaseService): - def __init__(self, test_context, kafka, configs): - super(StreamsRepeatingIntegerKeyProducerService, self).__init__(test_context, - kafka, - "org.apache.kafka.streams.tests.StreamsRepeatingIntegerKeyProducer", - configs) class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): def __init__(self, test_context, kafka): super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context, diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 17f1ec36cb4..cbce27e1785 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -15,16 +15,16 @@ import json import os + import time - -from ducktape.services.background_thread import BackgroundThreadService from ducktape.cluster.remoteaccount import RemoteCommandError - +from ducktape.services.background_thread import BackgroundThreadService from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.verifiable_client import VerifiableClientMixin from kafkatest.utils import is_int, is_int_with_prefix from kafkatest.version import DEV_BRANCH + class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): """This service wraps org.apache.kafka.tools.VerifiableProducer for use in system testing. @@ -57,7 +57,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", - enable_idempotence=False, offline_nodes=[], create_time=-1): + enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None): """ :param max_messages is a number of messages to be produced per producer :param message_validator checks for an expected format of messages produced. There are @@ -93,6 +93,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.enable_idempotence = enable_idempotence self.offline_nodes = offline_nodes self.create_time = create_time + self.repeating_keys = repeating_keys def java_class_name(self): return "VerifiableProducer" @@ -198,6 +199,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou cmd += " --acks %s " % str(self.acks) if self.create_time > -1: cmd += " --message-create-time %s " % str(self.create_time) + if self.repeating_keys is not None: + cmd += " --repeating-keys %s " % str(self.repeating_keys) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py index 70da44e2454..320d4b2068b 100644 --- a/tests/kafkatest/tests/streams/base_streams_test.py +++ b/tests/kafkatest/tests/streams/base_streams_test.py @@ -38,13 +38,14 @@ class BaseStreamsTest(KafkaTest): client_id, max_messages=num_messages) - def get_producer(self, topic, num_messages): + def get_producer(self, topic, num_messages, repeating_keys=None): return VerifiableProducer(self.test_context, 1, self.kafka, topic, max_messages=num_messages, - acks=1) + acks=1, + repeating_keys=repeating_keys) def assert_produce_consume(self, streams_source_topic, diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py index e901cb37f00..416a110f064 100644 --- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py +++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.services.streams import StreamsRepeatingIntegerKeyProducerService +from ducktape.utils.util import wait_until from kafkatest.services.streams import StreamsStandbyTaskService from kafkatest.tests.streams.base_streams_test import BaseStreamsTest @@ -30,7 +30,7 @@ class StreamsStandbyTask(BaseStreamsTest): streams_sink_topic_2 = "standbyTaskSink2" client_id = "stream-broker-resilience-verify-consumer" - num_messages = 60000 + num_messages = 300000 def __init__(self, test_context): super(StreamsStandbyTask, self).__init__(test_context, @@ -42,15 +42,13 @@ class StreamsStandbyTask(BaseStreamsTest): def test_standby_tasks_rebalance(self): - driver_configs = "num_messages=%s,topics=%s" % (str(self.num_messages), self.streams_source_topic) - - driver = StreamsRepeatingIntegerKeyProducerService(self.test_context, self.kafka, driver_configs) - driver.start() - configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic, self.streams_sink_topic_1, self.streams_sink_topic_2)) + producer = self.get_producer(self.streams_source_topic, self.num_messages, repeating_keys=6) + producer.start() + processor_1 = StreamsStandbyTaskService(self.test_context, self.kafka, configs) processor_2 = StreamsStandbyTaskService(self.test_context, self.kafka, configs) processor_3 = StreamsStandbyTaskService(self.test_context, self.kafka, configs) @@ -113,7 +111,10 @@ class StreamsStandbyTask(BaseStreamsTest): self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages) self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages) - self.wait_for_verification(driver, "Producer shut down now, sent total {0} of requested {0}".format(str(self.num_messages)), - driver.STDOUT_FILE) + wait_until(lambda: producer.num_acked >= self.num_messages, + timeout_sec=60, + err_msg="Failed to send all %s messages" % str(self.num_messages)) + + producer.stop() diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 975cba71497..744142b2f64 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -18,15 +18,22 @@ package org.apache.kafka.tools; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; + import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Exit; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -36,13 +43,6 @@ import java.util.Properties; import static net.sourceforge.argparse4j.impl.Arguments.store; -import net.sourceforge.argparse4j.ArgumentParsers; -import net.sourceforge.argparse4j.inf.ArgumentParser; -import net.sourceforge.argparse4j.inf.ArgumentParserException; -import net.sourceforge.argparse4j.inf.Namespace; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Exit; - /** * Primarily intended for use with system testing, this producer prints metadata * in the form of JSON to stdout on each "send" request. For example, this helps @@ -80,13 +80,20 @@ public class VerifiableProducer { // if null, then values are produced without a prefix private final Integer valuePrefix; + // Send messages with a key of 0 incrementing by 1 for + // each message produced when number specified is reached + // key is reset to 0 + private final Integer repeatingKeys; + + private int keyCounter; + // The create time to set in messages, in milliseconds since epoch private Long createTime; private final Long startTime; public VerifiableProducer(KafkaProducer producer, String topic, int throughput, int maxMessages, - Integer valuePrefix, Long createTime) { + Integer valuePrefix, Long createTime, Integer repeatingKeys) { this.topic = topic; this.throughput = throughput; @@ -95,6 +102,7 @@ public class VerifiableProducer { this.valuePrefix = valuePrefix; this.createTime = createTime; this.startTime = System.currentTimeMillis(); + this.repeatingKeys = repeatingKeys; } @@ -170,6 +178,14 @@ public class VerifiableProducer { .dest("valuePrefix") .help("If specified, each produced value will have this prefix with a dot separator"); + parser.addArgument("--repeating-keys") + .action(store()) + .required(false) + .type(Integer.class) + .metavar("REPEATING-KEYS") + .dest("repeatingKeys") + .help("If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified (exclusive), then the key is set to 0 again"); + return parser; } @@ -200,6 +216,7 @@ public class VerifiableProducer { String configFile = res.getString("producer.config"); Integer valuePrefix = res.getInt("valuePrefix"); Long createTime = (long) res.getInt("createTime"); + Integer repeatingKeys = res.getInt("repeatingKeys"); if (createTime == -1L) createTime = null; @@ -224,7 +241,7 @@ public class VerifiableProducer { StringSerializer serializer = new StringSerializer(); KafkaProducer producer = new KafkaProducer<>(producerProps, serializer, serializer); - return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime); + return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime, repeatingKeys); } /** Produce a message with given key and value. */ @@ -260,6 +277,17 @@ public class VerifiableProducer { return String.format("%d", val); } + public String getKey() { + String key = null; + if (repeatingKeys != null) { + key = Integer.toString(keyCounter++); + if (keyCounter == repeatingKeys) { + keyCounter = 0; + } + } + return key; + } + /** Close the producer to flush any remaining messages. */ public void close() { producer.close(); @@ -468,7 +496,7 @@ public class VerifiableProducer { } long sendStartMs = System.currentTimeMillis(); - this.send(null, this.getValue(i)); + this.send(this.getKey(), this.getValue(i)); if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle();