MINOR: Use `Record` instead of `ByteBufferMessageSet` in `ProduceRequestTest`

We want to phase out `ByteBufferMessageSet` eventually, so new code should favour `Record` where possible.

Also use a fixed timestamp in `testCorruptLz4ProduceRequest` to ensure that
the checksum is always the same.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #1357 from ijuma/produce-request-test-improvement
This commit is contained in:
Ismael Juma 2016-05-18 11:01:27 -07:00 committed by Guozhang Wang
parent 2bd7b64506
commit c36cc60f73
2 changed files with 52 additions and 17 deletions

View File

@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@ -31,9 +32,12 @@ import java.util.Random;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Utils;
/**
* Helper functions for writing unit tests
*/
@ -141,4 +145,22 @@ public class TestUtils {
return file;
}
/**
* Create a records buffer including the offset and message size at the start, which is required if the buffer is to
* be sent as part of `ProduceRequest`. This is the reason why we can't use
* `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this
* constructor does not include either of these fields.
*/
public static ByteBuffer partitionRecordsBuffer(long offset, CompressionType compressionType, Record... records) {
int bufferSize = 0;
for (Record record : records)
bufferSize += Records.LOG_OVERHEAD + record.size();
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType);
for (Record record : records)
memoryRecords.append(offset, record);
memoryRecords.close();
return memoryRecords.buffer();
}
}

View File

@ -17,10 +17,13 @@
package kafka.server
import kafka.message.{ByteBufferMessageSet, LZ4CompressionCodec, Message}
import java.nio.ByteBuffer
import kafka.utils.TestUtils
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
import org.apache.kafka.common.record.{CompressionType, Record}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test
@ -36,17 +39,26 @@ class ProduceRequestTest extends BaseRequestTest {
@Test
def testSimpleProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val messageBuffer = new ByteBufferMessageSet(new Message("value".getBytes, "key".getBytes,
System.currentTimeMillis(), 1: Byte)).buffer
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> messageBuffer)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
assertEquals(Errors.NONE.code, partitionResponse.errorCode)
assertEquals(0, partitionResponse.baseOffset)
assertEquals(-1, partitionResponse.timestamp)
def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = {
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> recordBuffer)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
assertEquals(Errors.NONE.code, partitionResponse.errorCode)
assertEquals(expectedOffset, partitionResponse.baseOffset)
assertEquals(-1, partitionResponse.timestamp)
partitionResponse
}
sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE,
new Record(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP,
new Record(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
new Record(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
}
/* returns a pair of partition id and leader id */
@ -60,12 +72,13 @@ class ProduceRequestTest extends BaseRequestTest {
@Test
def testCorruptLz4ProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val messageBuffer = new ByteBufferMessageSet(LZ4CompressionCodec, new Message("value".getBytes, "key".getBytes,
System.currentTimeMillis(), 1: Byte)).buffer
val timestamp = 1000000
val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4,
new Record(timestamp, "key".getBytes, "value".getBytes))
// Change the lz4 checksum value so that it doesn't match the contents
messageBuffer.array.update(40, 0)
recordBuffer.array.update(40, 0)
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> messageBuffer)
val partitionRecords = Map(topicPartition -> recordBuffer)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head