diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java new file mode 100644 index 00000000000..9acd5fad2d7 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.workload; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +/** + * Describes the payload for the producer record. Currently, it generates constant size values + * and either null keys or constant size key (depending on requested key type). The generator + * is deterministic -- two generator objects created with the same key type, message size, and + * value divergence ratio (see `valueDivergenceRatio` description) will generate the same sequence + * of key/value pairs. + */ +public class PayloadGenerator { + + public static final double DEFAULT_VALUE_DIVERGENCE_RATIO = 0.3; + public static final int DEFAULT_MESSAGE_SIZE = 512; + + /** + * This is the ratio of how much each next value is different from the previous value. This + * is directly related to compression rate we will get. Example: 0.3 divergence ratio gets us + * about 0.3 - 0.45 compression rate with lz4. + */ + private final double valueDivergenceRatio; + private final long baseSeed; + private long currentPosition; + private byte[] baseRecordValue; + private PayloadKeyType recordKeyType; + private Random random; + + public PayloadGenerator() { + this(DEFAULT_MESSAGE_SIZE, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO); + } + + /** + * Generator will generate null keys and values of size `messageSize` + * @param messageSize number of bytes used for key + value + */ + public PayloadGenerator(int messageSize) { + this(messageSize, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO); + } + + /** + * Generator will generate keys of given type and values of size 'messageSize' - (key size). + * If the given key type requires more bytes than messageSize, then the resulting payload + * will be keys of size required for the given key type and 0-length values. + * @param messageSize number of bytes used for key + value + * @param keyType type of keys generated + */ + public PayloadGenerator(int messageSize, PayloadKeyType keyType) { + this(messageSize, keyType, DEFAULT_VALUE_DIVERGENCE_RATIO); + } + + /** + * Generator will generate keys of given type and values of size 'messageSize' - (key size). + * If the given key type requires more bytes than messageSize, then the resulting payload + * will be keys of size required for the given key type and 0-length values. + * @param messageSize key + value size + * @param valueDivergenceRatio ratio of how much each next value is different from the previous + * value. Used to approximately control target compression rate (if + * compression is used). + */ + public PayloadGenerator(int messageSize, PayloadKeyType keyType, + double valueDivergenceRatio) { + this.baseSeed = 856; // some random number, may later let pass seed to constructor + this.currentPosition = 0; + this.valueDivergenceRatio = valueDivergenceRatio; + this.random = new Random(this.baseSeed); + + final int valueSize = (messageSize > keyType.maxSizeInBytes()) + ? messageSize - keyType.maxSizeInBytes() : 0; + this.baseRecordValue = new byte[valueSize]; + // initialize value with random bytes + for (int i = 0; i < baseRecordValue.length; ++i) { + baseRecordValue[i] = (byte) (random.nextInt(26) + 65); + } + this.recordKeyType = keyType; + } + + /** + * Returns current position of the payload generator. + */ + public long position() { + return currentPosition; + } + + /** + * Creates record based on the current position, and increments current position. + */ + public ProducerRecord nextRecord(String topicName) { + return nextRecord(topicName, currentPosition++); + } + + /** + * Creates record based on the given position. Does not change the current position. + */ + public ProducerRecord nextRecord(String topicName, long position) { + byte[] keyBytes = null; + if (recordKeyType == PayloadKeyType.KEY_MESSAGE_INDEX) { + keyBytes = ByteBuffer.allocate(recordKeyType.maxSizeInBytes()).putLong(position).array(); + } else if (recordKeyType != PayloadKeyType.KEY_NULL) { + throw new UnsupportedOperationException( + "PayloadGenerator does not know how to generate key for key type " + recordKeyType); + } + return new ProducerRecord<>(topicName, keyBytes, nextValue(position)); + } + + @Override + public String toString() { + return "PayloadGenerator(recordKeySize=" + recordKeyType.maxSizeInBytes() + + ", recordValueSize=" + baseRecordValue.length + + ", valueDivergenceRatio=" + valueDivergenceRatio + ")"; + } + + /** + * Returns producer record value + */ + private byte[] nextValue(long position) { + // set the seed based on the given position to make sure that the same value is generated + // for the same position. + random.setSeed(baseSeed + 31 * position + 1); + // randomize some of the payload to achieve expected compression rate + byte[] recordValue = Arrays.copyOf(baseRecordValue, baseRecordValue.length); + for (int i = 0; i < recordValue.length * valueDivergenceRatio; ++i) + recordValue[i] = (byte) (random.nextInt(26) + 65); + return recordValue; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java new file mode 100644 index 00000000000..3ed98cd840b --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +/** + * Describes a key in producer payload + */ +public enum PayloadKeyType { + // null key + KEY_NULL(0), + // fixed size key containing a long integer representing a message index (i.e., position of + // the payload generator) + KEY_MESSAGE_INDEX(8); + + private final int maxSizeInBytes; + + PayloadKeyType(int maxSizeInBytes) { + this.maxSizeInBytes = maxSizeInBytes; + } + + public int maxSizeInBytes() { + return maxSizeInBytes; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java index 9f25842727d..efb2d859cd5 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java @@ -36,6 +36,7 @@ public class ProduceBenchSpec extends TaskSpec { private final String bootstrapServers; private final int targetMessagesPerSec; private final int maxMessages; + private final int messageSize; private final Map producerConf; private final int totalTopics; private final int activeTopics; @@ -47,6 +48,7 @@ public class ProduceBenchSpec extends TaskSpec { @JsonProperty("bootstrapServers") String bootstrapServers, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, @JsonProperty("maxMessages") int maxMessages, + @JsonProperty("messageSize") int messageSize, @JsonProperty("producerConf") Map producerConf, @JsonProperty("totalTopics") int totalTopics, @JsonProperty("activeTopics") int activeTopics) { @@ -55,6 +57,7 @@ public class ProduceBenchSpec extends TaskSpec { this.bootstrapServers = bootstrapServers; this.targetMessagesPerSec = targetMessagesPerSec; this.maxMessages = maxMessages; + this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize; this.producerConf = producerConf; this.totalTopics = totalTopics; this.activeTopics = activeTopics; @@ -80,6 +83,11 @@ public class ProduceBenchSpec extends TaskSpec { return maxMessages; } + @JsonProperty + public int messageSize() { + return messageSize; + } + @JsonProperty public Map producerConf() { return producerConf; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 27e49cdc7f4..1bd386d1e3f 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -56,8 +56,6 @@ public class ProduceBenchWorker implements TaskWorker { private static final short REPLICATION_FACTOR = 3; - private static final int MESSAGE_SIZE = 512; - private static final int THROTTLE_PERIOD_MS = 100; private final String id; @@ -174,6 +172,8 @@ public class ProduceBenchWorker implements TaskWorker { private final KafkaProducer producer; + private final PayloadGenerator payloadGenerator; + private final Throttle throttle; SendRecords() { @@ -187,6 +187,7 @@ public class ProduceBenchWorker implements TaskWorker { props.setProperty(entry.getKey(), entry.getValue()); } this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); + this.payloadGenerator = new PayloadGenerator(spec.messageSize()); this.throttle = new SendRecordsThrottle(perPeriod, producer); } @@ -194,13 +195,11 @@ public class ProduceBenchWorker implements TaskWorker { public Void call() throws Exception { long startTimeMs = Time.SYSTEM.milliseconds(); try { - byte[] key = new byte[MESSAGE_SIZE]; - byte[] value = new byte[MESSAGE_SIZE]; Future future = null; try { for (int m = 0; m < spec.maxMessages(); m++) { for (int i = 0; i < spec.activeTopics(); i++) { - ProducerRecord record = new ProducerRecord<>(topicIndexToName(i), key, value); + ProducerRecord record = payloadGenerator.nextRecord(topicIndexToName(i)); future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); } throttle.increment(); @@ -217,6 +216,7 @@ public class ProduceBenchWorker implements TaskWorker { statusUpdaterFuture.cancel(false); new StatusUpdater(histogram).run(); long curTimeMs = Time.SYSTEM.milliseconds(); + log.info("Produced {}", payloadGenerator); log.info("Sent {} total record(s) in {} ms. status: {}", histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get()); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 9031c458058..5dfac1f6c72 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -33,8 +33,6 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Platform; @@ -44,6 +42,7 @@ import org.apache.kafka.trogdor.task.TaskWorker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -60,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference; public class RoundTripWorker implements TaskWorker { private static final int THROTTLE_PERIOD_MS = 100; - private static final int VALUE_SIZE = 512; + private static final int MESSAGE_SIZE = 512; private static final int LOG_INTERVAL_MS = 5000; @@ -82,9 +81,11 @@ public class RoundTripWorker implements TaskWorker { private KafkaFutureImpl doneFuture; - private KafkaProducer producer; + private KafkaProducer producer; - private KafkaConsumer consumer; + private PayloadGenerator payloadGenerator; + + private KafkaConsumer consumer; private CountDownLatch unackedSends; @@ -177,16 +178,16 @@ public class RoundTripWorker implements TaskWorker { props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000); - producer = new KafkaProducer<>(props, new StringSerializer(), + producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); int perPeriod = WorkerUtils. perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS); + payloadGenerator = new PayloadGenerator(MESSAGE_SIZE, PayloadKeyType.KEY_MESSAGE_INDEX); } @Override public void run() { - byte[] value = new byte[VALUE_SIZE]; final ToSendTracker toSendTracker = new ToSendTracker(spec.maxMessages()); long messagesSent = 0; long uniqueMessagesSent = 0; @@ -204,8 +205,8 @@ public class RoundTripWorker implements TaskWorker { uniqueMessagesSent++; } messagesSent++; - ProducerRecord record = - new ProducerRecord<>(TOPIC_NAME, 0, String.valueOf(messageIndex), value); + // we explicitly specify generator position based on message index + ProducerRecord record = payloadGenerator.nextRecord(TOPIC_NAME, messageIndex); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -267,7 +268,7 @@ public class RoundTripWorker implements TaskWorker { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); - consumer = new KafkaConsumer<>(props, new StringDeserializer(), + consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); consumer.subscribe(Collections.singleton(TOPIC_NAME)); } @@ -283,9 +284,9 @@ public class RoundTripWorker implements TaskWorker { while (true) { try { pollInvoked++; - ConsumerRecords records = consumer.poll(50); - for (ConsumerRecord record : records.records(TOPIC_NAME)) { - int messageIndex = Integer.parseInt(record.key()); + ConsumerRecords records = consumer.poll(50); + for (ConsumerRecord record : records.records(TOPIC_NAME)) { + int messageIndex = ByteBuffer.wrap(record.key()).getInt(); messagesReceived++; if (toReceiveTracker.removePending(messageIndex)) { uniqueMessagesReceived++; diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java new file mode 100644 index 00000000000..d2954a5d4cc --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class PayloadGeneratorTest { + + @Test + public void testGeneratorStartsAtPositionZero() { + PayloadGenerator payloadGenerator = new PayloadGenerator(); + assertEquals(0, payloadGenerator.position()); + } + + @Test + public void testDefaultPayload() { + final long numRecords = 262; + PayloadGenerator payloadGenerator = new PayloadGenerator(); + + // make sure that each time we produce a different value (except if compression rate is 0) + byte[] prevValue = null; + long expectedPosition = 0; + for (int i = 0; i < numRecords; i++) { + ProducerRecord record = payloadGenerator.nextRecord("test-topic"); + assertNull(record.key()); + assertEquals(PayloadGenerator.DEFAULT_MESSAGE_SIZE, record.value().length); + assertEquals(++expectedPosition, payloadGenerator.position()); + assertFalse("Position " + payloadGenerator.position(), + Arrays.equals(prevValue, record.value())); + prevValue = record.value().clone(); + } + } + + @Test + public void testNullKeyTypeValueSizeIsMessageSize() { + final int size = 200; + PayloadGenerator payloadGenerator = new PayloadGenerator(size); + ProducerRecord record = payloadGenerator.nextRecord("test-topic"); + assertNull(record.key()); + assertEquals(size, record.value().length); + } + + @Test + public void testKeyContainsGeneratorPosition() { + final long numRecords = 10; + final int size = 200; + PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); + for (int i = 0; i < numRecords; i++) { + assertEquals(i, generator.position()); + ProducerRecord record = generator.nextRecord("test-topic"); + assertEquals(8, record.key().length); + assertEquals(size - 8, record.value().length); + assertEquals("i=" + i, i, ByteBuffer.wrap(record.key()).getLong()); + } + } + + @Test + public void testGeneratePayloadWithExplicitPosition() { + final int size = 200; + PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); + int position = 2; + while (position < 5000000) { + ProducerRecord record = generator.nextRecord("test-topic", position); + assertEquals(8, record.key().length); + assertEquals(size - 8, record.value().length); + assertEquals(position, ByteBuffer.wrap(record.key()).getLong()); + position = position * 64; + } + } + + public void testSamePositionGeneratesSameKeyAndValue() { + final int size = 100; + PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); + ProducerRecord record1 = generator.nextRecord("test-topic"); + assertEquals(1, generator.position()); + ProducerRecord record2 = generator.nextRecord("test-topic"); + assertEquals(2, generator.position()); + ProducerRecord record3 = generator.nextRecord("test-topic", 0); + // position should not change if we generated record with specific position + assertEquals(2, generator.position()); + assertFalse("Values at different positions should not match.", + Arrays.equals(record1.value(), record2.value())); + assertFalse("Values at different positions should not match.", + Arrays.equals(record3.value(), record2.value())); + assertTrue("Values at the same position should match.", + Arrays.equals(record1.value(), record3.value())); + } + + @Test + public void testGeneratesDeterministicKeyValues() { + final long numRecords = 194; + final int size = 100; + PayloadGenerator generator1 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); + PayloadGenerator generator2 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); + for (int i = 0; i < numRecords; ++i) { + ProducerRecord record1 = generator1.nextRecord("test-topic"); + ProducerRecord record2 = generator2.nextRecord("test-topic"); + assertTrue(Arrays.equals(record1.value(), record2.value())); + assertTrue(Arrays.equals(record1.key(), record2.key())); + } + } + + @Test + public void testTooSmallMessageSizeCreatesPayloadWithOneByteValues() { + PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX); + ProducerRecord record = payloadGenerator.nextRecord("test-topic", 877); + assertEquals(8, record.key().length); + assertEquals(0, record.value().length); + } + + @Test + public void testNextRecordGeneratesNewByteArrayForValue() { + PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX); + ProducerRecord record1 = payloadGenerator.nextRecord("test-topic", 877); + ProducerRecord record2 = payloadGenerator.nextRecord("test-topic", 877); + assertNotEquals(record1.value(), record2.value()); + } +}