MINOR: update VerifiableProducer to send keys if configured and removed StreamsRepeatingKeyProducerService (#4841)

This PR does the following:

* Remove the StreamsRepeatingIntegerKeyProducerService and the associated Java class
* Add a parameter to VerifiableProducer.java to enable sending keys when specified
* Update the corresponding Python file verifiable_producer.py to support the new parameter.

Reviewers: Matthias J Sax <matthias@confluentio>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bill Bejeck 2018-04-28 01:12:57 -04:00 committed by Guozhang Wang
parent 0143a65091
commit c6fd3d488e
6 changed files with 61 additions and 157 deletions

View File

@ -1,123 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
/**
* Utility class used to send messages with integer keys
* repeating in sequence every 1000 messages. Multiple topics for publishing
* can be provided in the config map with key of 'topics' and ';' delimited list of output topics
*/
public class StreamsRepeatingIntegerKeyProducer {
private static volatile boolean keepProducing = true;
private volatile static int messageCounter = 0;
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
System.exit(1);
}
System.out.println("StreamsTest instance started");
final String propFileName = args[0];
final String configString = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
}
final Map<String, String> configs = SystemTestUtil.parseConfigs(configString);
System.out.println("Using provided configs " + configs);
final int numMessages = configs.containsKey("num_messages") ? Integer.parseInt(configs.get("num_messages")) : 1000;
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "StreamsRepeatingIntegerKeyProducer");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
final String value = "testingValue";
Integer key = 0;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
keepProducing = false;
}
}));
final String[] topics = configs.get("topics").split(";");
final int totalMessagesToProduce = numMessages * topics.length;
try (final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(producerProps)) {
while (keepProducing && messageCounter < totalMessagesToProduce) {
for (final String topic : topics) {
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key.toString(), value + key);
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
exception.printStackTrace(System.err);
System.err.flush();
if (exception instanceof TimeoutException) {
try {
// message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
messageCounter -= expired;
} catch (final Exception ignore) {
}
}
}
}
});
messageCounter += 1;
}
key += 1;
if (key % 1000 == 0) {
System.out.println("Sent 1000 messages");
Utils.sleep(100);
key = 0;
}
}
}
System.out.println("Producer shut down now, sent total " + messageCounter + " of requested " + totalMessagesToProduce);
System.out.flush();
}
}

View File

@ -397,12 +397,6 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
configs) configs)
class StreamsRepeatingIntegerKeyProducerService(StreamsTestBaseService):
def __init__(self, test_context, kafka, configs):
super(StreamsRepeatingIntegerKeyProducerService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsRepeatingIntegerKeyProducer",
configs)
class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def __init__(self, test_context, kafka): def __init__(self, test_context, kafka):
super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context, super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,

View File

@ -15,16 +15,16 @@
import json import json
import os import os
import time import time
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.verifiable_client import VerifiableClientMixin from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.utils import is_int, is_int_with_prefix from kafkatest.utils import is_int, is_int_with_prefix
from kafkatest.version import DEV_BRANCH from kafkatest.version import DEV_BRANCH
class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
"""This service wraps org.apache.kafka.tools.VerifiableProducer for use in """This service wraps org.apache.kafka.tools.VerifiableProducer for use in
system testing. system testing.
@ -57,7 +57,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO",
enable_idempotence=False, offline_nodes=[], create_time=-1): enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None):
""" """
:param max_messages is a number of messages to be produced per producer :param max_messages is a number of messages to be produced per producer
:param message_validator checks for an expected format of messages produced. There are :param message_validator checks for an expected format of messages produced. There are
@ -93,6 +93,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
self.enable_idempotence = enable_idempotence self.enable_idempotence = enable_idempotence
self.offline_nodes = offline_nodes self.offline_nodes = offline_nodes
self.create_time = create_time self.create_time = create_time
self.repeating_keys = repeating_keys
def java_class_name(self): def java_class_name(self):
return "VerifiableProducer" return "VerifiableProducer"
@ -198,6 +199,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
cmd += " --acks %s " % str(self.acks) cmd += " --acks %s " % str(self.acks)
if self.create_time > -1: if self.create_time > -1:
cmd += " --message-create-time %s " % str(self.create_time) cmd += " --message-create-time %s " % str(self.create_time)
if self.repeating_keys is not None:
cmd += " --repeating-keys %s " % str(self.repeating_keys)
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)

View File

@ -38,13 +38,14 @@ class BaseStreamsTest(KafkaTest):
client_id, client_id,
max_messages=num_messages) max_messages=num_messages)
def get_producer(self, topic, num_messages): def get_producer(self, topic, num_messages, repeating_keys=None):
return VerifiableProducer(self.test_context, return VerifiableProducer(self.test_context,
1, 1,
self.kafka, self.kafka,
topic, topic,
max_messages=num_messages, max_messages=num_messages,
acks=1) acks=1,
repeating_keys=repeating_keys)
def assert_produce_consume(self, def assert_produce_consume(self,
streams_source_topic, streams_source_topic,

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from kafkatest.services.streams import StreamsRepeatingIntegerKeyProducerService from ducktape.utils.util import wait_until
from kafkatest.services.streams import StreamsStandbyTaskService from kafkatest.services.streams import StreamsStandbyTaskService
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
@ -30,7 +30,7 @@ class StreamsStandbyTask(BaseStreamsTest):
streams_sink_topic_2 = "standbyTaskSink2" streams_sink_topic_2 = "standbyTaskSink2"
client_id = "stream-broker-resilience-verify-consumer" client_id = "stream-broker-resilience-verify-consumer"
num_messages = 60000 num_messages = 300000
def __init__(self, test_context): def __init__(self, test_context):
super(StreamsStandbyTask, self).__init__(test_context, super(StreamsStandbyTask, self).__init__(test_context,
@ -42,15 +42,13 @@ class StreamsStandbyTask(BaseStreamsTest):
def test_standby_tasks_rebalance(self): def test_standby_tasks_rebalance(self):
driver_configs = "num_messages=%s,topics=%s" % (str(self.num_messages), self.streams_source_topic)
driver = StreamsRepeatingIntegerKeyProducerService(self.test_context, self.kafka, driver_configs)
driver.start()
configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic, configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic,
self.streams_sink_topic_1, self.streams_sink_topic_1,
self.streams_sink_topic_2)) self.streams_sink_topic_2))
producer = self.get_producer(self.streams_source_topic, self.num_messages, repeating_keys=6)
producer.start()
processor_1 = StreamsStandbyTaskService(self.test_context, self.kafka, configs) processor_1 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
processor_2 = StreamsStandbyTaskService(self.test_context, self.kafka, configs) processor_2 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
processor_3 = StreamsStandbyTaskService(self.test_context, self.kafka, configs) processor_3 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
@ -113,7 +111,10 @@ class StreamsStandbyTask(BaseStreamsTest):
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages) self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages) self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages)
self.wait_for_verification(driver, "Producer shut down now, sent total {0} of requested {0}".format(str(self.num_messages)), wait_until(lambda: producer.num_acked >= self.num_messages,
driver.STDOUT_FILE) timeout_sec=60,
err_msg="Failed to send all %s messages" % str(self.num_messages))
producer.stop()

View File

@ -18,15 +18,22 @@ package org.apache.kafka.tools;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.kafka.common.utils.Exit;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -36,13 +43,6 @@ import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.store;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
/** /**
* Primarily intended for use with system testing, this producer prints metadata * Primarily intended for use with system testing, this producer prints metadata
* in the form of JSON to stdout on each "send" request. For example, this helps * in the form of JSON to stdout on each "send" request. For example, this helps
@ -80,13 +80,20 @@ public class VerifiableProducer {
// if null, then values are produced without a prefix // if null, then values are produced without a prefix
private final Integer valuePrefix; private final Integer valuePrefix;
// Send messages with a key of 0 incrementing by 1 for
// each message produced when number specified is reached
// key is reset to 0
private final Integer repeatingKeys;
private int keyCounter;
// The create time to set in messages, in milliseconds since epoch // The create time to set in messages, in milliseconds since epoch
private Long createTime; private Long createTime;
private final Long startTime; private final Long startTime;
public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages, public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages,
Integer valuePrefix, Long createTime) { Integer valuePrefix, Long createTime, Integer repeatingKeys) {
this.topic = topic; this.topic = topic;
this.throughput = throughput; this.throughput = throughput;
@ -95,6 +102,7 @@ public class VerifiableProducer {
this.valuePrefix = valuePrefix; this.valuePrefix = valuePrefix;
this.createTime = createTime; this.createTime = createTime;
this.startTime = System.currentTimeMillis(); this.startTime = System.currentTimeMillis();
this.repeatingKeys = repeatingKeys;
} }
@ -170,6 +178,14 @@ public class VerifiableProducer {
.dest("valuePrefix") .dest("valuePrefix")
.help("If specified, each produced value will have this prefix with a dot separator"); .help("If specified, each produced value will have this prefix with a dot separator");
parser.addArgument("--repeating-keys")
.action(store())
.required(false)
.type(Integer.class)
.metavar("REPEATING-KEYS")
.dest("repeatingKeys")
.help("If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified (exclusive), then the key is set to 0 again");
return parser; return parser;
} }
@ -200,6 +216,7 @@ public class VerifiableProducer {
String configFile = res.getString("producer.config"); String configFile = res.getString("producer.config");
Integer valuePrefix = res.getInt("valuePrefix"); Integer valuePrefix = res.getInt("valuePrefix");
Long createTime = (long) res.getInt("createTime"); Long createTime = (long) res.getInt("createTime");
Integer repeatingKeys = res.getInt("repeatingKeys");
if (createTime == -1L) if (createTime == -1L)
createTime = null; createTime = null;
@ -224,7 +241,7 @@ public class VerifiableProducer {
StringSerializer serializer = new StringSerializer(); StringSerializer serializer = new StringSerializer();
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer);
return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime); return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime, repeatingKeys);
} }
/** Produce a message with given key and value. */ /** Produce a message with given key and value. */
@ -260,6 +277,17 @@ public class VerifiableProducer {
return String.format("%d", val); return String.format("%d", val);
} }
public String getKey() {
String key = null;
if (repeatingKeys != null) {
key = Integer.toString(keyCounter++);
if (keyCounter == repeatingKeys) {
keyCounter = 0;
}
}
return key;
}
/** Close the producer to flush any remaining messages. */ /** Close the producer to flush any remaining messages. */
public void close() { public void close() {
producer.close(); producer.close();
@ -468,7 +496,7 @@ public class VerifiableProducer {
} }
long sendStartMs = System.currentTimeMillis(); long sendStartMs = System.currentTimeMillis();
this.send(null, this.getValue(i)); this.send(this.getKey(), this.getValue(i));
if (throttler.shouldThrottle(i, sendStartMs)) { if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle(); throttler.throttle();