kafka-1493; Use a well-documented LZ4 compression format and remove redundant LZ4HC option; patched by James Oliver; reviewed by Jun Rao

This commit is contained in:
James Oliver 2014-10-17 10:07:34 -07:00 committed by Jun Rao
parent 4271ecbf06
commit 37356bfee0
14 changed files with 698 additions and 58 deletions

View File

@ -153,7 +153,7 @@ public class ProducerConfig extends AbstractConfig {
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>lz4hc</code>. "
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. "
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
/** <code>metrics.sample.window.ms</code> */

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.message;
import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD;
import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG;
import org.apache.kafka.common.utils.Utils;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
/**
* A partial implementation of the v1.4.1 LZ4 Frame format.
*
* @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing Format Spec</a>
*/
public final class KafkaLZ4BlockInputStream extends FilterInputStream {
public static final String PREMATURE_EOS = "Stream ended prematurely";
public static final String NOT_SUPPORTED = "Stream unsupported";
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
private final LZ4SafeDecompressor decompressor;
private final XXHash32 checksum;
private final byte[] buffer;
private final byte[] compressedBuffer;
private final int maxBlockSize;
private FLG flg;
private BD bd;
private int bufferOffset;
private int bufferSize;
private boolean finished;
/**
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
*
* @param in The stream to decompress
* @throws IOException
*/
public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
super(in);
decompressor = LZ4Factory.fastestInstance().safeDecompressor();
checksum = XXHashFactory.fastestInstance().hash32();
readHeader();
maxBlockSize = bd.getBlockMaximumSize();
buffer = new byte[maxBlockSize];
compressedBuffer = new byte[maxBlockSize];
bufferOffset = 0;
bufferSize = 0;
finished = false;
}
/**
* Reads the magic number and frame descriptor from the underlying {@link InputStream}.
*
* @throws IOException
*/
private void readHeader() throws IOException {
byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
bufferOffset = 6;
if (in.read(header, 0, bufferOffset) != bufferOffset) {
throw new IOException(PREMATURE_EOS);
}
if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) {
throw new IOException(NOT_SUPPORTED);
}
flg = FLG.fromByte(header[bufferOffset-2]);
bd = BD.fromByte(header[bufferOffset-1]);
// TODO read uncompressed content size, update flg.validate()
// TODO read dictionary id, update flg.validate()
// check stream descriptor hash
byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
header[bufferOffset++] = (byte) in.read();
if (hash != header[bufferOffset-1]) {
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
}
}
/**
* Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum,
* and writes the result to a buffer.
*
* @throws IOException
*/
private void readBlock() throws IOException {
int blockSize = Utils.readUnsignedIntLE(in);
// Check for EndMark
if (blockSize == 0) {
finished = true;
// TODO implement content checksum, update flg.validate()
return;
} else if (blockSize > maxBlockSize) {
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
}
boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
byte[] bufferToRead;
if (compressed) {
bufferToRead = compressedBuffer;
} else {
blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
bufferToRead = buffer;
bufferSize = blockSize;
}
if (in.read(bufferToRead, 0, blockSize) != blockSize) {
throw new IOException(PREMATURE_EOS);
}
// verify checksum
if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
throw new IOException(BLOCK_HASH_MISMATCH);
}
if (compressed) {
try {
bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
} catch (LZ4Exception e) {
throw new IOException(e);
}
}
bufferOffset = 0;
}
@Override
public int read() throws IOException {
if (finished) {
return -1;
}
if (available() == 0) {
readBlock();
}
if (finished) {
return -1;
}
int value = buffer[bufferOffset++] & 0xFF;
return value;
}
@Override
public int read(byte b[], int off, int len) throws IOException {
net.jpountz.util.Utils.checkRange(b, off, len);
if (finished) {
return -1;
}
if (available() == 0) {
readBlock();
}
if (finished) {
return -1;
}
len = Math.min(len, available());
System.arraycopy(buffer, bufferOffset, b, off, len);
bufferOffset += len;
return len;
}
@Override
public long skip(long n) throws IOException {
if (finished) {
return 0;
}
if (available() == 0) {
readBlock();
}
if (finished) {
return 0;
}
n = Math.min(n, available());
bufferOffset += n;
return n;
}
@Override
public int available() throws IOException {
return bufferSize - bufferOffset;
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public synchronized void mark(int readlimit) {
throw new RuntimeException("mark not supported");
}
@Override
public synchronized void reset() throws IOException {
throw new RuntimeException("reset not supported");
}
@Override
public boolean markSupported() {
return false;
}
}

View File

@ -0,0 +1,387 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.message;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.kafka.common.utils.Utils;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
/**
* A partial implementation of the v1.4.1 LZ4 Frame format.
*
* @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing Format Spec</a>
*/
public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
public static final int MAGIC = 0x184D2204;
public static final int LZ4_MAX_HEADER_LENGTH = 19;
public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
public static final String CLOSED_STREAM = "The stream is already closed";
public static final int BLOCKSIZE_64KB = 4;
public static final int BLOCKSIZE_256KB = 5;
public static final int BLOCKSIZE_1MB = 6;
public static final int BLOCKSIZE_4MB = 7;
private final LZ4Compressor compressor;
private final XXHash32 checksum;
private final FLG flg;
private final BD bd;
private final byte[] buffer;
private final byte[] compressedBuffer;
private final int maxBlockSize;
private int bufferOffset;
private boolean finished;
/**
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
*
* @param out The output stream to compress
* @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception
* @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for every block of data
* @throws IOException
*/
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
super(out);
compressor = LZ4Factory.fastestInstance().fastCompressor();
checksum = XXHashFactory.fastestInstance().hash32();
bd = new BD(blockSize);
flg = new FLG(blockChecksum);
bufferOffset = 0;
maxBlockSize = bd.getBlockMaximumSize();
buffer = new byte[maxBlockSize];
compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
finished = false;
writeHeader();
}
/**
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
*
* @param out The stream to compress
* @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception
* @throws IOException
*/
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
this(out, blockSize, false);
}
/**
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
*
* @param out The output stream to compress
* @throws IOException
*/
public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
this(out, BLOCKSIZE_64KB);
}
/**
* Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
*
* @throws IOException
*/
private void writeHeader() throws IOException {
Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
bufferOffset = 4;
buffer[bufferOffset++] = flg.toByte();
buffer[bufferOffset++] = bd.toByte();
// TODO write uncompressed content size, update flg.validate()
// TODO write dictionary id, update flg.validate()
// compute checksum on all descriptor fields
int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
buffer[bufferOffset++] = (byte) hash;
// write out frame descriptor
out.write(buffer, 0, bufferOffset);
bufferOffset = 0;
}
/**
* Compresses buffered data, optionally computes an XXHash32 checksum, and writes
* the result to the underlying {@link OutputStream}.
*
* @throws IOException
*/
private void writeBlock() throws IOException {
if (bufferOffset == 0) {
return;
}
int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0);
byte[] bufferToWrite = compressedBuffer;
int compressMethod = 0;
// Store block uncompressed if compressed length is greater (incompressible)
if (compressedLength >= bufferOffset) {
bufferToWrite = buffer;
compressedLength = bufferOffset;
compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
}
// Write content
Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
out.write(bufferToWrite, 0, compressedLength);
// Calculate and write block checksum
if (flg.isBlockChecksumSet()) {
int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
Utils.writeUnsignedIntLE(out, hash);
}
bufferOffset = 0;
}
/**
* Similar to the {@link #writeBlock()} method. Writes a 0-length block
* (without block checksum) to signal the end of the block stream.
*
* @throws IOException
*/
private void writeEndMark() throws IOException {
Utils.writeUnsignedIntLE(out, 0);
// TODO implement content checksum, update flg.validate()
finished = true;
}
@Override
public void write(int b) throws IOException {
ensureNotFinished();
if (bufferOffset == maxBlockSize) {
writeBlock();
}
buffer[bufferOffset++] = (byte) b;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
net.jpountz.util.Utils.checkRange(b, off, len);
ensureNotFinished();
int bufferRemainingLength = maxBlockSize - bufferOffset;
// while b will fill the buffer
while (len > bufferRemainingLength) {
// fill remaining space in buffer
System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
bufferOffset = maxBlockSize;
writeBlock();
// compute new offset and length
off += bufferRemainingLength;
len -= bufferRemainingLength;
bufferRemainingLength = maxBlockSize;
}
System.arraycopy(b, off, buffer, bufferOffset, len);
bufferOffset += len;
}
@Override
public void flush() throws IOException {
if (!finished) {
writeBlock();
}
if (out != null) {
out.flush();
}
}
/**
* A simple state check to ensure the stream is still open.
*/
private void ensureNotFinished() {
if (finished) {
throw new IllegalStateException(CLOSED_STREAM);
}
}
@Override
public void close() throws IOException {
if (!finished) {
writeEndMark();
flush();
finished = true;
}
if (out != null) {
out.close();
out = null;
}
}
public static class FLG {
private static final int VERSION = 1;
private final int presetDictionary;
private final int reserved1;
private final int contentChecksum;
private final int contentSize;
private final int blockChecksum;
private final int blockIndependence;
private final int version;
public FLG() {
this(false);
}
public FLG(boolean blockChecksum) {
this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
}
private FLG(int presetDictionary, int reserved1, int contentChecksum,
int contentSize, int blockChecksum, int blockIndependence, int version) {
this.presetDictionary = presetDictionary;
this.reserved1 = reserved1;
this.contentChecksum = contentChecksum;
this.contentSize = contentSize;
this.blockChecksum = blockChecksum;
this.blockIndependence = blockIndependence;
this.version = version;
validate();
}
public static FLG fromByte(byte flg) {
int presetDictionary = (flg >>> 0) & 1;
int reserved1 = (flg >>> 1) & 1;
int contentChecksum = (flg >>> 2) & 1;
int contentSize = (flg >>> 3) & 1;
int blockChecksum = (flg >>> 4) & 1;
int blockIndependence = (flg >>> 5) & 1;
int version = (flg >>> 6) & 3;
return new FLG(presetDictionary, reserved1, contentChecksum,
contentSize, blockChecksum, blockIndependence, version);
}
public byte toByte() {
return (byte) (
((presetDictionary & 1) << 0)
| ((reserved1 & 1) << 1)
| ((contentChecksum & 1) << 2)
| ((contentSize & 1) << 3)
| ((blockChecksum & 1) << 4)
| ((blockIndependence & 1) << 5)
| ((version & 3) << 6) );
}
private void validate() {
if (presetDictionary != 0) {
throw new RuntimeException("Preset dictionary is unsupported");
}
if (reserved1 != 0) {
throw new RuntimeException("Reserved1 field must be 0");
}
if (contentChecksum != 0) {
throw new RuntimeException("Content checksum is unsupported");
}
if (contentSize != 0) {
throw new RuntimeException("Content size is unsupported");
}
if (blockIndependence != 1) {
throw new RuntimeException("Dependent block stream is unsupported");
}
if (version != VERSION) {
throw new RuntimeException(String.format("Version %d is unsupported", version));
}
}
public boolean isPresetDictionarySet() {
return presetDictionary == 1;
}
public boolean isContentChecksumSet() {
return contentChecksum == 1;
}
public boolean isContentSizeSet() {
return contentSize == 1;
}
public boolean isBlockChecksumSet() {
return blockChecksum == 1;
}
public boolean isBlockIndependenceSet() {
return blockIndependence == 1;
}
public int getVersion() {
return version;
}
}
public static class BD {
private final int reserved2;
private final int blockSizeValue;
private final int reserved3;
public BD() {
this(0, BLOCKSIZE_64KB, 0);
}
public BD(int blockSizeValue) {
this(0, blockSizeValue, 0);
}
private BD(int reserved2, int blockSizeValue, int reserved3) {
this.reserved2 = reserved2;
this.blockSizeValue = blockSizeValue;
this.reserved3 = reserved3;
validate();
}
public static BD fromByte(byte bd) {
int reserved2 = (bd >>> 0) & 15;
int blockMaximumSize = (bd >>> 4) & 7;
int reserved3 = (bd >>> 7) & 1;
return new BD(reserved2, blockMaximumSize, reserved3);
}
private void validate() {
if (reserved2 != 0) {
throw new RuntimeException("Reserved2 field must be 0");
}
if (blockSizeValue < 4 || blockSizeValue > 7) {
throw new RuntimeException("Block size value must be between 4 and 7");
}
if (reserved3 != 0) {
throw new RuntimeException("Reserved3 field must be 0");
}
}
// 2^(2n+8)
public int getBlockMaximumSize() {
return (1 << ((2 * blockSizeValue) + 8));
}
public byte toByte() {
return (byte) (
((reserved2 & 15) << 0)
| ((blockSizeValue & 7) << 4)
| ((reserved3 & 1) << 7) );
}
}
}

View File

@ -20,7 +20,7 @@ package org.apache.kafka.common.record;
* The compression type to use
*/
public enum CompressionType {
NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f);
NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f);
public final int id;
public final String name;
@ -42,8 +42,6 @@ public enum CompressionType {
return SNAPPY;
case 3:
return LZ4;
case 4:
return LZ4HC;
default:
throw new IllegalArgumentException("Unknown compression type id: " + id);
}
@ -58,8 +56,6 @@ public enum CompressionType {
return SNAPPY;
else if (LZ4.name.equals(name))
return LZ4;
else if (LZ4HC.name.equals(name))
return LZ4HC;
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}

View File

@ -218,27 +218,13 @@ public class Compressor {
}
case LZ4:
try {
Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class)
Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream");
OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
.newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4HC:
try {
Class<?> factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory");
Class<?> compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor");
Class<?> lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
Object factory = factoryClass.getMethod("fastestInstance").invoke(null);
Object compressor = factoryClass.getMethod("highCompressor").invoke(factory);
OutputStream stream = (OutputStream) lz4BlockOutputStream
.getConstructor(OutputStream.class, Integer.TYPE, compressorClass)
.newInstance(buffer, 1 << 16, compressor);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
@ -266,10 +252,9 @@ public class Compressor {
throw new KafkaException(e);
}
case LZ4:
case LZ4HC:
// dynamically load LZ4 class to avoid runtime dependency
try {
Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream");
Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream");
InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
.newInstance(buffer);
return new DataInputStream(stream);

View File

@ -12,6 +12,9 @@
*/
package org.apache.kafka.common.utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
@ -74,6 +77,34 @@ public class Utils {
return buffer.getInt(index) & 0xffffffffL;
}
/**
* Read an unsigned integer stored in little-endian format from the {@link InputStream}.
*
* @param in The stream to read from
* @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
*/
public static int readUnsignedIntLE(InputStream in) throws IOException {
return (in.read() << 8*0)
| (in.read() << 8*1)
| (in.read() << 8*2)
| (in.read() << 8*3);
}
/**
* Read an unsigned integer stored in little-endian format from a byte array
* at a given offset.
*
* @param buffer The byte array to read from
* @param offset The position in buffer to read from
* @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
*/
public static int readUnsignedIntLE(byte[] buffer, int offset) {
return (buffer[offset++] << 8*0)
| (buffer[offset++] << 8*1)
| (buffer[offset++] << 8*2)
| (buffer[offset] << 8*3);
}
/**
* Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
*
@ -95,6 +126,35 @@ public class Utils {
buffer.putInt(index, (int) (value & 0xffffffffL));
}
/**
* Write an unsigned integer in little-endian format to the {@link OutputStream}.
*
* @param out The stream to write to
* @param value The value to write
*/
public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
out.write(value >>> 8*0);
out.write(value >>> 8*1);
out.write(value >>> 8*2);
out.write(value >>> 8*3);
}
/**
* Write an unsigned integer in little-endian format to a byte array
* at a given offset.
*
* @param buffer The byte array to write to
* @param offset The position in buffer to write to
* @param value The value to write
*/
public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
buffer[offset++] = (byte) (value >>> 8*0);
buffer[offset++] = (byte) (value >>> 8*1);
buffer[offset++] = (byte) (value >>> 8*2);
buffer[offset] = (byte) (value >>> 8*3);
}
/**
* Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
* java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).

View File

@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc.
# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder

View File

@ -24,7 +24,6 @@ object CompressionCodec {
case GZIPCompressionCodec.codec => GZIPCompressionCodec
case SnappyCompressionCodec.codec => SnappyCompressionCodec
case LZ4CompressionCodec.codec => LZ4CompressionCodec
case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec))
}
}
@ -34,7 +33,6 @@ object CompressionCodec {
case GZIPCompressionCodec.name => GZIPCompressionCodec
case SnappyCompressionCodec.name => SnappyCompressionCodec
case LZ4CompressionCodec.name => LZ4CompressionCodec
case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name))
}
}
@ -62,11 +60,6 @@ case object LZ4CompressionCodec extends CompressionCodec {
val name = "lz4"
}
case object LZ4HCCompressionCodec extends CompressionCodec {
val codec = 4
val name = "lz4hc"
}
case object NoCompressionCodec extends CompressionCodec {
val codec = 0
val name = "none"

View File

@ -22,6 +22,8 @@ import java.util.zip.GZIPOutputStream
import java.util.zip.GZIPInputStream
import java.io.InputStream
import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
object CompressionFactory {
def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = {
@ -32,11 +34,7 @@ object CompressionFactory {
import org.xerial.snappy.SnappyOutputStream
new SnappyOutputStream(stream)
case LZ4CompressionCodec =>
import net.jpountz.lz4.LZ4BlockOutputStream
new LZ4BlockOutputStream(stream)
case LZ4HCCompressionCodec =>
import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory}
new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor())
new KafkaLZ4BlockOutputStream(stream)
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}
@ -49,9 +47,8 @@ object CompressionFactory {
case SnappyCompressionCodec =>
import org.xerial.snappy.SnappyInputStream
new SnappyInputStream(stream)
case LZ4CompressionCodec | LZ4HCCompressionCodec =>
import net.jpountz.lz4.LZ4BlockInputStream
new LZ4BlockInputStream(stream)
case LZ4CompressionCodec =>
new KafkaLZ4BlockInputStream(stream)
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}

View File

@ -113,7 +113,7 @@ object ConsoleProducer {
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." +
val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', or 'lz4'." +
"If specified without value, then it defaults to 'gzip'")
.withOptionalArg()
.describedAs("compression-codec")

View File

@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) {
.defaultsTo(200)
val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
.withRequiredArg
.describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4")
.describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val helpOpt = parser.accepts("help", "Print usage.")

View File

@ -47,7 +47,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
private val config = new KafkaConfig(props)
private val topic = "topic"
private val numRecords = 100
private val numRecords = 2000
@Before
override def setUp() {
@ -73,6 +73,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
var producer = new KafkaProducer(props)
val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
@ -125,7 +127,6 @@ object ProducerCompressionTest {
list.add(Array("gzip"))
list.add(Array("snappy"))
list.add(Array("lz4"))
list.add(Array("lz4hc"))
list
}
}

View File

@ -32,8 +32,6 @@ class MessageCompressionTest extends JUnitSuite {
codecs += SnappyCompressionCodec
if(isLZ4Available)
codecs += LZ4CompressionCodec
if (izLZ4HCAvailable)
codecs += LZ4HCCompressionCodec
for(codec <- codecs)
testSimpleCompressDecompress(codec)
}
@ -74,14 +72,4 @@ class MessageCompressionTest extends JUnitSuite {
case e: UnsatisfiedLinkError => false
}
}
def izLZ4HCAvailable(): Boolean = {
try {
val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16,
net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor())
true
} catch {
case e: UnsatisfiedLinkError => false
}
}
}

View File

@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite {
def setUp(): Unit = {
val keys = Array(null, "key".getBytes, "".getBytes)
val vals = Array("value".getBytes, "".getBytes, null)
val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec, LZ4HCCompressionCodec)
val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec)
for(k <- keys; v <- vals; codec <- codecs)
messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
}