diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 60c15e60bff..e23a52e710f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -33,6 +33,7 @@ public class Compressor { static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; + static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; private static final float[] TYPE_TO_RATE; @@ -52,7 +53,7 @@ public class Compressor { @Override public Constructor get() throws ClassNotFoundException, NoSuchMethodException { return Class.forName("org.xerial.snappy.SnappyOutputStream") - .getConstructor(OutputStream.class); + .getConstructor(OutputStream.class, Integer.TYPE); } }); @@ -107,7 +108,7 @@ public class Compressor { // create the stream bufferStream = new ByteBufferOutputStream(buffer); - appendStream = wrapForOutput(bufferStream, type); + appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE); } public ByteBuffer buffer() { @@ -241,16 +242,16 @@ public class Compressor { // the following two functions also need to be public since they are used in MemoryRecords.iteration - static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type) { + static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { try { switch (type) { case NONE: return new DataOutputStream(buffer); case GZIP: - return new DataOutputStream(new GZIPOutputStream(buffer)); + return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); case SNAPPY: try { - OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer); + OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); diff --git a/docs/upgrade.html b/docs/upgrade.html index 2972f261e5a..d09b9d76bea 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -68,10 +68,18 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9 message throughput degradation because of the increased overhead. Likewise, replication now transmits an additional 8 bytes per message. If you're running close to the network capacity of your cluster, it's possible that you'll overwhelm the network cards - and see failures and performance issues due to the overload. When receiving compressed messages, 0.10.0 + and see failures and performance issues due to the overload. +

+ Note: If you have enabled compression on producers, you may notice reduced producer throughput and/or + lower compression rate on the broker in some cases. When receiving compressed messages, 0.10.0 brokers avoid recompressing the messages, which in general reduces the latency and improves the throughput. In - certain cases, this may reduce the batching size on the producer, which could lead to worse throughput. If this - happens, users can tune linger.ms and batch.size of the producer for better throughput. + certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this + happens, users can tune linger.ms and batch.size of the producer for better throughput. In addition, the producer buffer + used for compressing messages with snappy is smaller than the one used by the broker, which may have a negative + impact on the compression ratio for the messages on disk. We intend to make this configurable in a future Kafka + release. +

+

Potential breaking changes in 0.10.0.0
@@ -101,7 +109,6 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9