MINOR: Add PayloadGenerator to Trogdor (#4640)

It generates the producer payload (key and value) and makes sure that the values are
populated to target a realistic compression rate (0.3 - 0.4) if compression is used.
The generated payload is deterministic and can be replayed from a given position.
For now, all generated values are constant size, and key types can be configured
to be either null or 8 bytes.

Added messageSize parameter to producer spec, that specifies produced
key + message size.
This commit is contained in:
Anna Povzner 2018-03-09 13:57:04 -08:00 committed by Ismael Juma
parent b1aa1912f0
commit f1c112c63d
6 changed files with 356 additions and 18 deletions

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
/**
* Describes the payload for the producer record. Currently, it generates constant size values
* and either null keys or constant size key (depending on requested key type). The generator
* is deterministic -- two generator objects created with the same key type, message size, and
* value divergence ratio (see `valueDivergenceRatio` description) will generate the same sequence
* of key/value pairs.
*/
public class PayloadGenerator {
public static final double DEFAULT_VALUE_DIVERGENCE_RATIO = 0.3;
public static final int DEFAULT_MESSAGE_SIZE = 512;
/**
* This is the ratio of how much each next value is different from the previous value. This
* is directly related to compression rate we will get. Example: 0.3 divergence ratio gets us
* about 0.3 - 0.45 compression rate with lz4.
*/
private final double valueDivergenceRatio;
private final long baseSeed;
private long currentPosition;
private byte[] baseRecordValue;
private PayloadKeyType recordKeyType;
private Random random;
public PayloadGenerator() {
this(DEFAULT_MESSAGE_SIZE, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
}
/**
* Generator will generate null keys and values of size `messageSize`
* @param messageSize number of bytes used for key + value
*/
public PayloadGenerator(int messageSize) {
this(messageSize, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
}
/**
* Generator will generate keys of given type and values of size 'messageSize' - (key size).
* If the given key type requires more bytes than messageSize, then the resulting payload
* will be keys of size required for the given key type and 0-length values.
* @param messageSize number of bytes used for key + value
* @param keyType type of keys generated
*/
public PayloadGenerator(int messageSize, PayloadKeyType keyType) {
this(messageSize, keyType, DEFAULT_VALUE_DIVERGENCE_RATIO);
}
/**
* Generator will generate keys of given type and values of size 'messageSize' - (key size).
* If the given key type requires more bytes than messageSize, then the resulting payload
* will be keys of size required for the given key type and 0-length values.
* @param messageSize key + value size
* @param valueDivergenceRatio ratio of how much each next value is different from the previous
* value. Used to approximately control target compression rate (if
* compression is used).
*/
public PayloadGenerator(int messageSize, PayloadKeyType keyType,
double valueDivergenceRatio) {
this.baseSeed = 856; // some random number, may later let pass seed to constructor
this.currentPosition = 0;
this.valueDivergenceRatio = valueDivergenceRatio;
this.random = new Random(this.baseSeed);
final int valueSize = (messageSize > keyType.maxSizeInBytes())
? messageSize - keyType.maxSizeInBytes() : 0;
this.baseRecordValue = new byte[valueSize];
// initialize value with random bytes
for (int i = 0; i < baseRecordValue.length; ++i) {
baseRecordValue[i] = (byte) (random.nextInt(26) + 65);
}
this.recordKeyType = keyType;
}
/**
* Returns current position of the payload generator.
*/
public long position() {
return currentPosition;
}
/**
* Creates record based on the current position, and increments current position.
*/
public ProducerRecord<byte[], byte[]> nextRecord(String topicName) {
return nextRecord(topicName, currentPosition++);
}
/**
* Creates record based on the given position. Does not change the current position.
*/
public ProducerRecord<byte[], byte[]> nextRecord(String topicName, long position) {
byte[] keyBytes = null;
if (recordKeyType == PayloadKeyType.KEY_MESSAGE_INDEX) {
keyBytes = ByteBuffer.allocate(recordKeyType.maxSizeInBytes()).putLong(position).array();
} else if (recordKeyType != PayloadKeyType.KEY_NULL) {
throw new UnsupportedOperationException(
"PayloadGenerator does not know how to generate key for key type " + recordKeyType);
}
return new ProducerRecord<>(topicName, keyBytes, nextValue(position));
}
@Override
public String toString() {
return "PayloadGenerator(recordKeySize=" + recordKeyType.maxSizeInBytes()
+ ", recordValueSize=" + baseRecordValue.length
+ ", valueDivergenceRatio=" + valueDivergenceRatio + ")";
}
/**
* Returns producer record value
*/
private byte[] nextValue(long position) {
// set the seed based on the given position to make sure that the same value is generated
// for the same position.
random.setSeed(baseSeed + 31 * position + 1);
// randomize some of the payload to achieve expected compression rate
byte[] recordValue = Arrays.copyOf(baseRecordValue, baseRecordValue.length);
for (int i = 0; i < recordValue.length * valueDivergenceRatio; ++i)
recordValue[i] = (byte) (random.nextInt(26) + 65);
return recordValue;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
/**
* Describes a key in producer payload
*/
public enum PayloadKeyType {
// null key
KEY_NULL(0),
// fixed size key containing a long integer representing a message index (i.e., position of
// the payload generator)
KEY_MESSAGE_INDEX(8);
private final int maxSizeInBytes;
PayloadKeyType(int maxSizeInBytes) {
this.maxSizeInBytes = maxSizeInBytes;
}
public int maxSizeInBytes() {
return maxSizeInBytes;
}
}

View File

@ -36,6 +36,7 @@ public class ProduceBenchSpec extends TaskSpec {
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final int maxMessages;
private final int messageSize;
private final Map<String, String> producerConf;
private final int totalTopics;
private final int activeTopics;
@ -47,6 +48,7 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") int maxMessages,
@JsonProperty("messageSize") int messageSize,
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("totalTopics") int totalTopics,
@JsonProperty("activeTopics") int activeTopics) {
@ -55,6 +57,7 @@ public class ProduceBenchSpec extends TaskSpec {
this.bootstrapServers = bootstrapServers;
this.targetMessagesPerSec = targetMessagesPerSec;
this.maxMessages = maxMessages;
this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize;
this.producerConf = producerConf;
this.totalTopics = totalTopics;
this.activeTopics = activeTopics;
@ -80,6 +83,11 @@ public class ProduceBenchSpec extends TaskSpec {
return maxMessages;
}
@JsonProperty
public int messageSize() {
return messageSize;
}
@JsonProperty
public Map<String, String> producerConf() {
return producerConf;

View File

@ -56,8 +56,6 @@ public class ProduceBenchWorker implements TaskWorker {
private static final short REPLICATION_FACTOR = 3;
private static final int MESSAGE_SIZE = 512;
private static final int THROTTLE_PERIOD_MS = 100;
private final String id;
@ -174,6 +172,8 @@ public class ProduceBenchWorker implements TaskWorker {
private final KafkaProducer<byte[], byte[]> producer;
private final PayloadGenerator payloadGenerator;
private final Throttle throttle;
SendRecords() {
@ -187,6 +187,7 @@ public class ProduceBenchWorker implements TaskWorker {
props.setProperty(entry.getKey(), entry.getValue());
}
this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
this.payloadGenerator = new PayloadGenerator(spec.messageSize());
this.throttle = new SendRecordsThrottle(perPeriod, producer);
}
@ -194,13 +195,11 @@ public class ProduceBenchWorker implements TaskWorker {
public Void call() throws Exception {
long startTimeMs = Time.SYSTEM.milliseconds();
try {
byte[] key = new byte[MESSAGE_SIZE];
byte[] value = new byte[MESSAGE_SIZE];
Future<RecordMetadata> future = null;
try {
for (int m = 0; m < spec.maxMessages(); m++) {
for (int i = 0; i < spec.activeTopics(); i++) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicIndexToName(i), key, value);
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(topicIndexToName(i));
future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
}
throttle.increment();
@ -217,6 +216,7 @@ public class ProduceBenchWorker implements TaskWorker {
statusUpdaterFuture.cancel(false);
new StatusUpdater(histogram).run();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("Produced {}", payloadGenerator);
log.info("Sent {} total record(s) in {} ms. status: {}",
histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
}

View File

@ -33,8 +33,6 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
@ -44,6 +42,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -60,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class RoundTripWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
private static final int VALUE_SIZE = 512;
private static final int MESSAGE_SIZE = 512;
private static final int LOG_INTERVAL_MS = 5000;
@ -82,9 +81,11 @@ public class RoundTripWorker implements TaskWorker {
private KafkaFutureImpl<String> doneFuture;
private KafkaProducer<String, byte[]> producer;
private KafkaProducer<byte[], byte[]> producer;
private KafkaConsumer<String, byte[]> consumer;
private PayloadGenerator payloadGenerator;
private KafkaConsumer<byte[], byte[]> consumer;
private CountDownLatch unackedSends;
@ -177,16 +178,16 @@ public class RoundTripWorker implements TaskWorker {
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
producer = new KafkaProducer<>(props, new StringSerializer(),
producer = new KafkaProducer<>(props, new ByteArraySerializer(),
new ByteArraySerializer());
int perPeriod = WorkerUtils.
perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
payloadGenerator = new PayloadGenerator(MESSAGE_SIZE, PayloadKeyType.KEY_MESSAGE_INDEX);
}
@Override
public void run() {
byte[] value = new byte[VALUE_SIZE];
final ToSendTracker toSendTracker = new ToSendTracker(spec.maxMessages());
long messagesSent = 0;
long uniqueMessagesSent = 0;
@ -204,8 +205,8 @@ public class RoundTripWorker implements TaskWorker {
uniqueMessagesSent++;
}
messagesSent++;
ProducerRecord<String, byte[]> record =
new ProducerRecord<>(TOPIC_NAME, 0, String.valueOf(messageIndex), value);
// we explicitly specify generator position based on message index
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(TOPIC_NAME, messageIndex);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
@ -267,7 +268,7 @@ public class RoundTripWorker implements TaskWorker {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
consumer = new KafkaConsumer<>(props, new StringDeserializer(),
consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
consumer.subscribe(Collections.singleton(TOPIC_NAME));
}
@ -283,9 +284,9 @@ public class RoundTripWorker implements TaskWorker {
while (true) {
try {
pollInvoked++;
ConsumerRecords<String, byte[]> records = consumer.poll(50);
for (ConsumerRecord<String, byte[]> record : records.records(TOPIC_NAME)) {
int messageIndex = Integer.parseInt(record.key());
ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
for (ConsumerRecord<byte[], byte[]> record : records.records(TOPIC_NAME)) {
int messageIndex = ByteBuffer.wrap(record.key()).getInt();
messagesReceived++;
if (toReceiveTracker.removePending(messageIndex)) {
uniqueMessagesReceived++;

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class PayloadGeneratorTest {
@Test
public void testGeneratorStartsAtPositionZero() {
PayloadGenerator payloadGenerator = new PayloadGenerator();
assertEquals(0, payloadGenerator.position());
}
@Test
public void testDefaultPayload() {
final long numRecords = 262;
PayloadGenerator payloadGenerator = new PayloadGenerator();
// make sure that each time we produce a different value (except if compression rate is 0)
byte[] prevValue = null;
long expectedPosition = 0;
for (int i = 0; i < numRecords; i++) {
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
assertNull(record.key());
assertEquals(PayloadGenerator.DEFAULT_MESSAGE_SIZE, record.value().length);
assertEquals(++expectedPosition, payloadGenerator.position());
assertFalse("Position " + payloadGenerator.position(),
Arrays.equals(prevValue, record.value()));
prevValue = record.value().clone();
}
}
@Test
public void testNullKeyTypeValueSizeIsMessageSize() {
final int size = 200;
PayloadGenerator payloadGenerator = new PayloadGenerator(size);
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
assertNull(record.key());
assertEquals(size, record.value().length);
}
@Test
public void testKeyContainsGeneratorPosition() {
final long numRecords = 10;
final int size = 200;
PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
for (int i = 0; i < numRecords; i++) {
assertEquals(i, generator.position());
ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic");
assertEquals(8, record.key().length);
assertEquals(size - 8, record.value().length);
assertEquals("i=" + i, i, ByteBuffer.wrap(record.key()).getLong());
}
}
@Test
public void testGeneratePayloadWithExplicitPosition() {
final int size = 200;
PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
int position = 2;
while (position < 5000000) {
ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic", position);
assertEquals(8, record.key().length);
assertEquals(size - 8, record.value().length);
assertEquals(position, ByteBuffer.wrap(record.key()).getLong());
position = position * 64;
}
}
public void testSamePositionGeneratesSameKeyAndValue() {
final int size = 100;
PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
ProducerRecord<byte[], byte[]> record1 = generator.nextRecord("test-topic");
assertEquals(1, generator.position());
ProducerRecord<byte[], byte[]> record2 = generator.nextRecord("test-topic");
assertEquals(2, generator.position());
ProducerRecord<byte[], byte[]> record3 = generator.nextRecord("test-topic", 0);
// position should not change if we generated record with specific position
assertEquals(2, generator.position());
assertFalse("Values at different positions should not match.",
Arrays.equals(record1.value(), record2.value()));
assertFalse("Values at different positions should not match.",
Arrays.equals(record3.value(), record2.value()));
assertTrue("Values at the same position should match.",
Arrays.equals(record1.value(), record3.value()));
}
@Test
public void testGeneratesDeterministicKeyValues() {
final long numRecords = 194;
final int size = 100;
PayloadGenerator generator1 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
PayloadGenerator generator2 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
for (int i = 0; i < numRecords; ++i) {
ProducerRecord<byte[], byte[]> record1 = generator1.nextRecord("test-topic");
ProducerRecord<byte[], byte[]> record2 = generator2.nextRecord("test-topic");
assertTrue(Arrays.equals(record1.value(), record2.value()));
assertTrue(Arrays.equals(record1.key(), record2.key()));
}
}
@Test
public void testTooSmallMessageSizeCreatesPayloadWithOneByteValues() {
PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic", 877);
assertEquals(8, record.key().length);
assertEquals(0, record.value().length);
}
@Test
public void testNextRecordGeneratesNewByteArrayForValue() {
PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
ProducerRecord<byte[], byte[]> record1 = payloadGenerator.nextRecord("test-topic", 877);
ProducerRecord<byte[], byte[]> record2 = payloadGenerator.nextRecord("test-topic", 877);
assertNotEquals(record1.value(), record2.value());
}
}