KAFKA-1253 Compression in the new producer: follow up patch to push new files

This commit is contained in:
Neha Narkhede 2014-03-26 21:50:30 -07:00
parent 466a83b78c
commit 9bc47bc136
5 changed files with 535 additions and 0 deletions

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* A byte buffer backed input outputStream
*/
public class ByteBufferInputStream extends InputStream {
private ByteBuffer buffer;
public ByteBufferInputStream(ByteBuffer buffer) {
this.buffer = buffer;
}
public int read() {
if (!buffer.hasRemaining()) {
return -1;
}
return buffer.get() & 0xFF;
}
public int read(byte[] bytes, int off, int len) {
if (!buffer.hasRemaining()) {
return -1;
}
len = Math.min(len, buffer.remaining());
buffer.get(bytes, off, len);
return len;
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* A byte buffer backed output outputStream
*/
public class ByteBufferOutputStream extends OutputStream {
private static float REALLOCATION_FACTOR = 1.1f;
private ByteBuffer buffer;
public ByteBufferOutputStream(ByteBuffer buffer) {
this.buffer = buffer;
}
public void write(int b) {
if (buffer.remaining() < 1)
expandBuffer(buffer.capacity() + 1);
buffer.put((byte) b);
}
public void write(byte[] bytes, int off, int len) {
if (buffer.remaining() < len)
expandBuffer(buffer.capacity() + len);
buffer.put(bytes, off, len);
}
public ByteBuffer buffer() {
return buffer;
}
private void expandBuffer(int size) {
int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
ByteBuffer temp = ByteBuffer.allocate(expandSize);
temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
buffer = temp;
}
}

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Utils;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class Compressor {
static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
private static float[] typeToRate;
private static int MAX_TYPE_ID = -1;
static {
for (CompressionType type : CompressionType.values()) {
MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id);
}
typeToRate = new float[MAX_TYPE_ID+1];
for (CompressionType type : CompressionType.values()) {
typeToRate[type.id] = type.rate;
}
}
private final CompressionType type;
private final DataOutputStream appendStream;
private final ByteBufferOutputStream bufferStream;
private final int initPos;
public long writtenUncompressed;
public long numRecords;
public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
this.type = type;
this.initPos = buffer.position();
this.numRecords = 0;
this.writtenUncompressed = 0;
if (type != CompressionType.NONE) {
// for compressed records, leave space for the header and the shallow message metadata
// and move the starting position to the value payload offset
buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
}
// create the stream
bufferStream = new ByteBufferOutputStream(buffer);
appendStream = wrapForOutput(bufferStream, type, blockSize);
}
public Compressor(ByteBuffer buffer, CompressionType type) {
this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}
public ByteBuffer buffer() {
return bufferStream.buffer();
}
public void close() {
try {
appendStream.close();
} catch (IOException e) {
throw new KafkaException(e);
}
if (type != CompressionType.NONE) {
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
// write the header, for the end offset write as number of records - 1
buffer.position(initPos);
buffer.putLong(numRecords - 1);
buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
// write the shallow message (the crc and value size are not correct yet)
Record.write(buffer, null, null, type, 0, -1);
// compute the fill the value size
int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize);
// compute and fill the crc at the beginning of the message
long crc = Record.computeChecksum(buffer,
initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
// reset the position
buffer.position(pos);
// update the compression ratio
float compressionRate = (float) buffer.position() / this.writtenUncompressed;
typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
}
}
// Note that for all the write operations below, IO exceptions should
// never be thrown since the underlying ByteBufferOutputStream does not throw IOException;
// therefore upon encountering this issue we just close the append stream.
public void putLong(final long value) {
try {
appendStream.writeLong(value);
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
public void putInt(final int value) {
try {
appendStream.writeInt(value);
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
public void put(final ByteBuffer buffer) {
try {
appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
public void putByte(final byte value) {
try {
appendStream.write(value);
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
public void put(final byte[] bytes, final int offset, final int len) {
try {
appendStream.write(bytes, offset, len);
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
// put a record as un-compressed into the underlying stream
long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize);
byte attributes = Record.computeAttributes(type);
putRecord(crc, attributes, key, value, valueOffset, valueSize);
}
public void putRecord(byte[] key, byte[] value) {
putRecord(key, value, CompressionType.NONE, 0, -1);
}
private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
Record.write(this, crc, attributes, key, value, valueOffset, valueSize);
}
public void recordWritten(int size) {
numRecords += 1;
writtenUncompressed += size;
}
public long estimatedBytesWritten() {
if (type == CompressionType.NONE) {
return bufferStream.buffer().position();
} else {
// estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
}
}
// the following two functions also need to be public since they are used in MemoryRecords.iteration
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
try {
switch (type) {
case NONE:
return new DataOutputStream(buffer);
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
case SNAPPY:
// dynamically load the snappy class to avoid runtime dependency
// on snappy if we are not using it
try {
Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream");
OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE)
.newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) {
try {
switch (type) {
case NONE:
return new DataInputStream(buffer);
case GZIP:
return new DataInputStream(new GZIPInputStream(buffer));
case SNAPPY:
// dynamically load the snappy class to avoid runtime dependency
// on snappy if we are not using it
try {
Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream");
InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class)
.newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import java.nio.ByteBuffer;
public class CrcTest {
@Test
public void testUpdate() {
final byte bytes[] = "Any String you want".getBytes();
final int len = bytes.length;
Crc32 crc1 = new Crc32();
Crc32 crc2 = new Crc32();
Crc32 crc3 = new Crc32();
crc1.update(bytes, 0, len);
for(int i = 0; i < len; i++)
crc2.update(bytes[i]);
crc3.update(bytes, 0, len/2);
crc3.update(bytes, len/2, len-len/2);
assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue());
}
@Test
public void testUpdateInt() {
final int value = 1000;
final ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(value);
Crc32 crc1 = new Crc32();
Crc32 crc2 = new Crc32();
crc1.updateInt(value);
crc2.update(buffer.array(), buffer.arrayOffset(), 4);
assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
}
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api.test
import java.util.{Properties, Collection, ArrayList}
import org.scalatest.junit.JUnit3Suite
import org.junit.runners.Parameterized
import org.junit.runner.RunWith
import org.junit.runners.Parameterized.Parameters
import org.junit.{After, Before, Test}
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
import org.junit.Assert._
import kafka.api.FetchRequestBuilder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{Utils, TestUtils}
import scala.Array
@RunWith(value = classOf[Parameterized])
class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId = 0
private val port = TestUtils.choosePort
private var server: KafkaServer = null
private val props = TestUtils.createBrokerConfig(brokerId, port)
private val config = new KafkaConfig(props)
private val topic = "topic"
private val numRecords = 100
@Before
override def setUp() {
super.setUp()
server = TestUtils.createServer(config)
}
@After
override def tearDown() {
server.shutdown
Utils.rm(server.config.logDirs)
super.tearDown()
}
/**
* testCompression
*
* Compressed messages should be able to sent and consumed correctly
*/
@Test
def testCompression() {
val props = new Properties()
props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
var producer = new KafkaProducer(props)
try {
// create topic
TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
val partition = 0
// prepare the messages
val messages = for (i <-0 until numRecords)
yield ("value" + i).getBytes
// make sure the returned messages are correct
val responses = for (message <- messages)
yield producer.send(new ProducerRecord(topic, null, null, message))
val futures = responses.toList
for ((future, offset) <- futures zip (0 until numRecords)) {
assertEquals(offset.toLong, future.get.offset)
}
// make sure the fetched message count match
val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer
assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size)
var index = 0
for (message <- messages) {
assertEquals(new Message(bytes = message), messageSet(index).message)
assertEquals(index.toLong, messageSet(index).offset)
index += 1
}
} finally {
if (producer != null) {
producer.close()
producer = null
}
}
}
}
object ProducerCompressionTest {
// NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]).
@Parameters
def parameters: Collection[Array[String]] = {
val list = new ArrayList[Array[String]]()
list.add(Array("gzip"))
list.add(Array("snappy"))
list
}
}