From 1182d61deb23b5cd86cbe462471f7df583a796e1 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 11 May 2016 17:01:14 -0700 Subject: [PATCH] KAFKA-3704: Remove hard-coded block size in KafkaProducer Author: Guozhang Wang Reviewers: Ismael Juma Closes #1371 from guozhangwang/K3565-remove-compression-blocksize --- .../apache/kafka/common/record/Compressor.java | 17 ++++++----------- .../org/apache/kafka/common/record/Record.java | 2 +- docs/upgrade.html | 1 + 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 37d53b81dcb..60c15e60bff 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -33,7 +33,6 @@ public class Compressor { static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; - static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; private static final float[] TYPE_TO_RATE; @@ -53,7 +52,7 @@ public class Compressor { @Override public Constructor get() throws ClassNotFoundException, NoSuchMethodException { return Class.forName("org.xerial.snappy.SnappyOutputStream") - .getConstructor(OutputStream.class, Integer.TYPE); + .getConstructor(OutputStream.class); } }); @@ -91,7 +90,7 @@ public class Compressor { public float compressionRate; public long maxTimestamp; - public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) { + public Compressor(ByteBuffer buffer, CompressionType type) { this.type = type; this.initPos = buffer.position(); @@ -108,11 +107,7 @@ public class Compressor { // create the stream bufferStream = new ByteBufferOutputStream(buffer); - appendStream = wrapForOutput(bufferStream, type, blockSize); - } - - public Compressor(ByteBuffer buffer, CompressionType type) { - this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE); + appendStream = wrapForOutput(bufferStream, type); } public ByteBuffer buffer() { @@ -246,16 +241,16 @@ public class Compressor { // the following two functions also need to be public since they are used in MemoryRecords.iteration - static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { + static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type) { try { switch (type) { case NONE: return new DataOutputStream(buffer); case GZIP: - return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); + return new DataOutputStream(new GZIPOutputStream(buffer)); case SNAPPY: try { - OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); + OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 147ad86986b..baab9ab6f1c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -146,7 +146,7 @@ public final class Record { public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { // construct the compressor with compression type none since this function will not do any //compression according to the input type, it will just write the record's payload as is - Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity()); + Compressor compressor = new Compressor(buffer, CompressionType.NONE); compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize); } diff --git a/docs/upgrade.html b/docs/upgrade.html index 4b8ec7eb9f0..3c98540ae07 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -91,6 +91,7 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9