diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index bf4ed66791b..9095caf0db1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -153,7 +153,7 @@ public class ProducerConfig extends AbstractConfig { /** compression.type */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; - private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, snappy, lz4, or lz4hc. " + private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, snappy, or lz4. " + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; /** metrics.sample.window.ms */ diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java new file mode 100644 index 00000000000..5be72fef1f9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.message; + +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH; +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD; +import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG; +import org.apache.kafka.common.utils.Utils; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4SafeDecompressor; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +/** + * A partial implementation of the v1.4.1 LZ4 Frame format. + * + * @see LZ4 Framing Format Spec + */ +public final class KafkaLZ4BlockInputStream extends FilterInputStream { + + public static final String PREMATURE_EOS = "Stream ended prematurely"; + public static final String NOT_SUPPORTED = "Stream unsupported"; + public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch"; + public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted"; + + private final LZ4SafeDecompressor decompressor; + private final XXHash32 checksum; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final int maxBlockSize; + private FLG flg; + private BD bd; + private int bufferOffset; + private int bufferSize; + private boolean finished; + + /** + * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. + * + * @param in The stream to decompress + * @throws IOException + */ + public KafkaLZ4BlockInputStream(InputStream in) throws IOException { + super(in); + decompressor = LZ4Factory.fastestInstance().safeDecompressor(); + checksum = XXHashFactory.fastestInstance().hash32(); + readHeader(); + maxBlockSize = bd.getBlockMaximumSize(); + buffer = new byte[maxBlockSize]; + compressedBuffer = new byte[maxBlockSize]; + bufferOffset = 0; + bufferSize = 0; + finished = false; + } + + /** + * Reads the magic number and frame descriptor from the underlying {@link InputStream}. + * + * @throws IOException + */ + private void readHeader() throws IOException { + byte[] header = new byte[LZ4_MAX_HEADER_LENGTH]; + + // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags + bufferOffset = 6; + if (in.read(header, 0, bufferOffset) != bufferOffset) { + throw new IOException(PREMATURE_EOS); + } + + if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) { + throw new IOException(NOT_SUPPORTED); + } + flg = FLG.fromByte(header[bufferOffset-2]); + bd = BD.fromByte(header[bufferOffset-1]); + // TODO read uncompressed content size, update flg.validate() + // TODO read dictionary id, update flg.validate() + + // check stream descriptor hash + byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF); + header[bufferOffset++] = (byte) in.read(); + if (hash != header[bufferOffset-1]) { + throw new IOException(DESCRIPTOR_HASH_MISMATCH); + } + } + + /** + * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, + * and writes the result to a buffer. + * + * @throws IOException + */ + private void readBlock() throws IOException { + int blockSize = Utils.readUnsignedIntLE(in); + + // Check for EndMark + if (blockSize == 0) { + finished = true; + // TODO implement content checksum, update flg.validate() + return; + } else if (blockSize > maxBlockSize) { + throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize)); + } + + boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0; + byte[] bufferToRead; + if (compressed) { + bufferToRead = compressedBuffer; + } else { + blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK; + bufferToRead = buffer; + bufferSize = blockSize; + } + + if (in.read(bufferToRead, 0, blockSize) != blockSize) { + throw new IOException(PREMATURE_EOS); + } + + // verify checksum + if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { + throw new IOException(BLOCK_HASH_MISMATCH); + } + + if (compressed) { + try { + bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize); + } catch (LZ4Exception e) { + throw new IOException(e); + } + } + + bufferOffset = 0; + } + + @Override + public int read() throws IOException { + if (finished) { + return -1; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return -1; + } + int value = buffer[bufferOffset++] & 0xFF; + + return value; + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + net.jpountz.util.Utils.checkRange(b, off, len); + if (finished) { + return -1; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return -1; + } + len = Math.min(len, available()); + System.arraycopy(buffer, bufferOffset, b, off, len); + bufferOffset += len; + return len; + } + + @Override + public long skip(long n) throws IOException { + if (finished) { + return 0; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return 0; + } + n = Math.min(n, available()); + bufferOffset += n; + return n; + } + + @Override + public int available() throws IOException { + return bufferSize - bufferOffset; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + throw new RuntimeException("mark not supported"); + } + + @Override + public synchronized void reset() throws IOException { + throw new RuntimeException("reset not supported"); + } + + @Override + public boolean markSupported() { + return false; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java new file mode 100644 index 00000000000..e5b9e433e14 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.message; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.kafka.common.utils.Utils; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +/** + * A partial implementation of the v1.4.1 LZ4 Frame format. + * + * @see LZ4 Framing Format Spec + */ +public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { + + public static final int MAGIC = 0x184D2204; + public static final int LZ4_MAX_HEADER_LENGTH = 19; + public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000; + + public static final String CLOSED_STREAM = "The stream is already closed"; + + public static final int BLOCKSIZE_64KB = 4; + public static final int BLOCKSIZE_256KB = 5; + public static final int BLOCKSIZE_1MB = 6; + public static final int BLOCKSIZE_4MB = 7; + + private final LZ4Compressor compressor; + private final XXHash32 checksum; + private final FLG flg; + private final BD bd; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final int maxBlockSize; + private int bufferOffset; + private boolean finished; + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception + * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for every block of data + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException { + super(out); + compressor = LZ4Factory.fastestInstance().fastCompressor(); + checksum = XXHashFactory.fastestInstance().hash32(); + bd = new BD(blockSize); + flg = new FLG(blockChecksum); + bufferOffset = 0; + maxBlockSize = bd.getBlockMaximumSize(); + buffer = new byte[maxBlockSize]; + compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)]; + finished = false; + writeHeader(); + } + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The stream to compress + * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException { + this(out, blockSize, false); + } + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException { + this(out, BLOCKSIZE_64KB); + } + + /** + * Writes the magic number and frame descriptor to the underlying {@link OutputStream}. + * + * @throws IOException + */ + private void writeHeader() throws IOException { + Utils.writeUnsignedIntLE(buffer, 0, MAGIC); + bufferOffset = 4; + buffer[bufferOffset++] = flg.toByte(); + buffer[bufferOffset++] = bd.toByte(); + // TODO write uncompressed content size, update flg.validate() + // TODO write dictionary id, update flg.validate() + // compute checksum on all descriptor fields + int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF; + buffer[bufferOffset++] = (byte) hash; + // write out frame descriptor + out.write(buffer, 0, bufferOffset); + bufferOffset = 0; + } + + /** + * Compresses buffered data, optionally computes an XXHash32 checksum, and writes + * the result to the underlying {@link OutputStream}. + * + * @throws IOException + */ + private void writeBlock() throws IOException { + if (bufferOffset == 0) { + return; + } + + int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0); + byte[] bufferToWrite = compressedBuffer; + int compressMethod = 0; + + // Store block uncompressed if compressed length is greater (incompressible) + if (compressedLength >= bufferOffset) { + bufferToWrite = buffer; + compressedLength = bufferOffset; + compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK; + } + + // Write content + Utils.writeUnsignedIntLE(out, compressedLength | compressMethod); + out.write(bufferToWrite, 0, compressedLength); + + // Calculate and write block checksum + if (flg.isBlockChecksumSet()) { + int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0); + Utils.writeUnsignedIntLE(out, hash); + } + bufferOffset = 0; + } + + /** + * Similar to the {@link #writeBlock()} method. Writes a 0-length block + * (without block checksum) to signal the end of the block stream. + * + * @throws IOException + */ + private void writeEndMark() throws IOException { + Utils.writeUnsignedIntLE(out, 0); + // TODO implement content checksum, update flg.validate() + finished = true; + } + + @Override + public void write(int b) throws IOException { + ensureNotFinished(); + if (bufferOffset == maxBlockSize) { + writeBlock(); + } + buffer[bufferOffset++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + net.jpountz.util.Utils.checkRange(b, off, len); + ensureNotFinished(); + + int bufferRemainingLength = maxBlockSize - bufferOffset; + // while b will fill the buffer + while (len > bufferRemainingLength) { + // fill remaining space in buffer + System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength); + bufferOffset = maxBlockSize; + writeBlock(); + // compute new offset and length + off += bufferRemainingLength; + len -= bufferRemainingLength; + bufferRemainingLength = maxBlockSize; + } + + System.arraycopy(b, off, buffer, bufferOffset, len); + bufferOffset += len; + } + + @Override + public void flush() throws IOException { + if (!finished) { + writeBlock(); + } + if (out != null) { + out.flush(); + } + } + + /** + * A simple state check to ensure the stream is still open. + */ + private void ensureNotFinished() { + if (finished) { + throw new IllegalStateException(CLOSED_STREAM); + } + } + + @Override + public void close() throws IOException { + if (!finished) { + writeEndMark(); + flush(); + finished = true; + } + if (out != null) { + out.close(); + out = null; + } + } + + public static class FLG { + + private static final int VERSION = 1; + + private final int presetDictionary; + private final int reserved1; + private final int contentChecksum; + private final int contentSize; + private final int blockChecksum; + private final int blockIndependence; + private final int version; + + public FLG() { + this(false); + } + + public FLG(boolean blockChecksum) { + this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION); + } + + private FLG(int presetDictionary, int reserved1, int contentChecksum, + int contentSize, int blockChecksum, int blockIndependence, int version) { + this.presetDictionary = presetDictionary; + this.reserved1 = reserved1; + this.contentChecksum = contentChecksum; + this.contentSize = contentSize; + this.blockChecksum = blockChecksum; + this.blockIndependence = blockIndependence; + this.version = version; + validate(); + } + + public static FLG fromByte(byte flg) { + int presetDictionary = (flg >>> 0) & 1; + int reserved1 = (flg >>> 1) & 1; + int contentChecksum = (flg >>> 2) & 1; + int contentSize = (flg >>> 3) & 1; + int blockChecksum = (flg >>> 4) & 1; + int blockIndependence = (flg >>> 5) & 1; + int version = (flg >>> 6) & 3; + + return new FLG(presetDictionary, reserved1, contentChecksum, + contentSize, blockChecksum, blockIndependence, version); + } + + public byte toByte() { + return (byte) ( + ((presetDictionary & 1) << 0) + | ((reserved1 & 1) << 1) + | ((contentChecksum & 1) << 2) + | ((contentSize & 1) << 3) + | ((blockChecksum & 1) << 4) + | ((blockIndependence & 1) << 5) + | ((version & 3) << 6) ); + } + + private void validate() { + if (presetDictionary != 0) { + throw new RuntimeException("Preset dictionary is unsupported"); + } + if (reserved1 != 0) { + throw new RuntimeException("Reserved1 field must be 0"); + } + if (contentChecksum != 0) { + throw new RuntimeException("Content checksum is unsupported"); + } + if (contentSize != 0) { + throw new RuntimeException("Content size is unsupported"); + } + if (blockIndependence != 1) { + throw new RuntimeException("Dependent block stream is unsupported"); + } + if (version != VERSION) { + throw new RuntimeException(String.format("Version %d is unsupported", version)); + } + } + + public boolean isPresetDictionarySet() { + return presetDictionary == 1; + } + + public boolean isContentChecksumSet() { + return contentChecksum == 1; + } + + public boolean isContentSizeSet() { + return contentSize == 1; + } + + public boolean isBlockChecksumSet() { + return blockChecksum == 1; + } + + public boolean isBlockIndependenceSet() { + return blockIndependence == 1; + } + + public int getVersion() { + return version; + } + } + + public static class BD { + + private final int reserved2; + private final int blockSizeValue; + private final int reserved3; + + public BD() { + this(0, BLOCKSIZE_64KB, 0); + } + + public BD(int blockSizeValue) { + this(0, blockSizeValue, 0); + } + + private BD(int reserved2, int blockSizeValue, int reserved3) { + this.reserved2 = reserved2; + this.blockSizeValue = blockSizeValue; + this.reserved3 = reserved3; + validate(); + } + + public static BD fromByte(byte bd) { + int reserved2 = (bd >>> 0) & 15; + int blockMaximumSize = (bd >>> 4) & 7; + int reserved3 = (bd >>> 7) & 1; + + return new BD(reserved2, blockMaximumSize, reserved3); + } + + private void validate() { + if (reserved2 != 0) { + throw new RuntimeException("Reserved2 field must be 0"); + } + if (blockSizeValue < 4 || blockSizeValue > 7) { + throw new RuntimeException("Block size value must be between 4 and 7"); + } + if (reserved3 != 0) { + throw new RuntimeException("Reserved3 field must be 0"); + } + } + + // 2^(2n+8) + public int getBlockMaximumSize() { + return (1 << ((2 * blockSizeValue) + 8)); + } + + public byte toByte() { + return (byte) ( + ((reserved2 & 15) << 0) + | ((blockSizeValue & 7) << 4) + | ((reserved3 & 1) << 7) ); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 5227b2d7ab8..65a7e432379 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,7 +20,7 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f); + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f); public final int id; public final String name; @@ -42,8 +42,6 @@ public enum CompressionType { return SNAPPY; case 3: return LZ4; - case 4: - return LZ4HC; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } @@ -58,8 +56,6 @@ public enum CompressionType { return SNAPPY; else if (LZ4.name.equals(name)) return LZ4; - else if (LZ4HC.name.equals(name)) - return LZ4HC; else throw new IllegalArgumentException("Unknown compression name: " + name); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0323f5f7032..d684e6833bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -218,27 +218,13 @@ public class Compressor { } case LZ4: try { - Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class) + Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream"); + OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class) .newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } - case LZ4HC: - try { - Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory"); - Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor"); - Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - Object factory = factoryClass.getMethod("fastestInstance").invoke(null); - Object compressor = factoryClass.getMethod("highCompressor").invoke(factory); - OutputStream stream = (OutputStream) lz4BlockOutputStream - .getConstructor(OutputStream.class, Integer.TYPE, compressorClass) - .newInstance(buffer, 1 << 16, compressor); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } default: throw new IllegalArgumentException("Unknown compression type: " + type); } @@ -266,10 +252,9 @@ public class Compressor { throw new KafkaException(e); } case LZ4: - case LZ4HC: // dynamically load LZ4 class to avoid runtime dependency try { - Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream"); + Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream"); InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class) .newInstance(buffer); return new DataInputStream(stream); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index a0827f576e8..527dd0f9c47 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -12,6 +12,9 @@ */ package org.apache.kafka.common.utils; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.regex.Matcher; @@ -74,6 +77,34 @@ public class Utils { return buffer.getInt(index) & 0xffffffffL; } + /** + * Read an unsigned integer stored in little-endian format from the {@link InputStream}. + * + * @param in The stream to read from + * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) + */ + public static int readUnsignedIntLE(InputStream in) throws IOException { + return (in.read() << 8*0) + | (in.read() << 8*1) + | (in.read() << 8*2) + | (in.read() << 8*3); + } + + /** + * Read an unsigned integer stored in little-endian format from a byte array + * at a given offset. + * + * @param buffer The byte array to read from + * @param offset The position in buffer to read from + * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) + */ + public static int readUnsignedIntLE(byte[] buffer, int offset) { + return (buffer[offset++] << 8*0) + | (buffer[offset++] << 8*1) + | (buffer[offset++] << 8*2) + | (buffer[offset] << 8*3); + } + /** * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. * @@ -95,6 +126,35 @@ public class Utils { buffer.putInt(index, (int) (value & 0xffffffffL)); } + /** + * Write an unsigned integer in little-endian format to the {@link OutputStream}. + * + * @param out The stream to write to + * @param value The value to write + */ + public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException { + out.write(value >>> 8*0); + out.write(value >>> 8*1); + out.write(value >>> 8*2); + out.write(value >>> 8*3); + } + + /** + * Write an unsigned integer in little-endian format to a byte array + * at a given offset. + * + * @param buffer The byte array to write to + * @param offset The position in buffer to write to + * @param value The value to write + */ + public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) { + buffer[offset++] = (byte) (value >>> 8*0); + buffer[offset++] = (byte) (value >>> 8*1); + buffer[offset++] = (byte) (value >>> 8*2); + buffer[offset] = (byte) (value >>> 8*3); + } + + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). diff --git a/config/producer.properties b/config/producer.properties index 39d65d7c6c2..47ae3e24130 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync -# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc. -# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally +# specify the compression codec for all data generated: none, gzip, snappy, lz4. +# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively compression.codec=none # message encoder diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index de0a0fade53..9439d2bc29a 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -24,7 +24,6 @@ object CompressionCodec { case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec case LZ4CompressionCodec.codec => LZ4CompressionCodec - case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -34,7 +33,6 @@ object CompressionCodec { case GZIPCompressionCodec.name => GZIPCompressionCodec case SnappyCompressionCodec.name => SnappyCompressionCodec case LZ4CompressionCodec.name => LZ4CompressionCodec - case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) } } @@ -62,11 +60,6 @@ case object LZ4CompressionCodec extends CompressionCodec { val name = "lz4" } -case object LZ4HCCompressionCodec extends CompressionCodec { - val codec = 4 - val name = "lz4hc" -} - case object NoCompressionCodec extends CompressionCodec { val codec = 0 val name = "none" diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index 8420e13d0d8..c721040bd04 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -22,6 +22,8 @@ import java.util.zip.GZIPOutputStream import java.util.zip.GZIPInputStream import java.io.InputStream +import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} + object CompressionFactory { def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = { @@ -32,11 +34,7 @@ object CompressionFactory { import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) case LZ4CompressionCodec => - import net.jpountz.lz4.LZ4BlockOutputStream - new LZ4BlockOutputStream(stream) - case LZ4HCCompressionCodec => - import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory} - new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor()) + new KafkaLZ4BlockOutputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } @@ -49,9 +47,8 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyInputStream new SnappyInputStream(stream) - case LZ4CompressionCodec | LZ4HCCompressionCodec => - import net.jpountz.lz4.LZ4BlockInputStream - new LZ4BlockInputStream(stream) + case LZ4CompressionCodec => + new KafkaLZ4BlockInputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index b024a693c23..397d80da08c 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -113,7 +113,7 @@ object ConsoleProducer { .describedAs("broker-list") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." + + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', or 'lz4'." + "If specified without value, then it defaults to 'gzip'") .withOptionalArg() .describedAs("compression-codec") diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala index c72002976d9..d073acf9adf 100644 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) { .defaultsTo(200) val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4") + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) val helpOpt = parser.accepts("help", "Print usage.") diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index c95485170fd..6379f2b60af 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -47,7 +47,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK private val config = new KafkaConfig(props) private val topic = "topic" - private val numRecords = 100 + private val numRecords = 2000 @Before override def setUp() { @@ -73,6 +73,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") + props.put(ProducerConfig.LINGER_MS_CONFIG, "200") var producer = new KafkaProducer(props) val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") @@ -125,7 +127,6 @@ object ProducerCompressionTest { list.add(Array("gzip")) list.add(Array("snappy")) list.add(Array("lz4")) - list.add(Array("lz4hc")) list } } diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index 6f0addcea64..0bb275d0dc8 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -32,8 +32,6 @@ class MessageCompressionTest extends JUnitSuite { codecs += SnappyCompressionCodec if(isLZ4Available) codecs += LZ4CompressionCodec - if (izLZ4HCAvailable) - codecs += LZ4HCCompressionCodec for(codec <- codecs) testSimpleCompressDecompress(codec) } @@ -74,14 +72,4 @@ class MessageCompressionTest extends JUnitSuite { case e: UnsatisfiedLinkError => false } } - - def izLZ4HCAvailable(): Boolean = { - try { - val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16, - net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor()) - true - } catch { - case e: UnsatisfiedLinkError => false - } - } } diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 958c1a60069..7b74a0d3154 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite { def setUp(): Unit = { val keys = Array(null, "key".getBytes, "".getBytes) val vals = Array("value".getBytes, "".getBytes, null) - val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec, LZ4HCCompressionCodec) + val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec) for(k <- keys; v <- vals; codec <- codecs) messages += new MessageTestVal(k, v, codec, new Message(v, k, codec)) }