KAFKA-3704; Revert "Remove hard-coded block size in KafkaProducer"

This is not an exact revert as the code changed a bit since the
original commit. We also include a note in `upgrade.html`.

The original commit is 1182d61deb.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira, Guozhang Wang

Closes #1391 from ijuma/kafka-3704-revert and squashes the following commits:

7891b67 [Ismael Juma] Tweak upgrade note based on Gwen's feedback
1673cd0 [Ismael Juma] Revert "KAFKA-3704: Remove hard-coded block size in KafkaProducer"
This commit is contained in:
Ismael Juma 2016-05-16 12:00:06 -07:00 committed by Gwen Shapira
parent 61f4c8b092
commit 9a3fcf4135
2 changed files with 17 additions and 9 deletions

View File

@ -33,6 +33,7 @@ public class Compressor {
static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
private static final float[] TYPE_TO_RATE;
@ -52,7 +53,7 @@ public class Compressor {
@Override
public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
return Class.forName("org.xerial.snappy.SnappyOutputStream")
.getConstructor(OutputStream.class);
.getConstructor(OutputStream.class, Integer.TYPE);
}
});
@ -107,7 +108,7 @@ public class Compressor {
// create the stream
bufferStream = new ByteBufferOutputStream(buffer);
appendStream = wrapForOutput(bufferStream, type);
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}
public ByteBuffer buffer() {
@ -241,16 +242,16 @@ public class Compressor {
// the following two functions also need to be public since they are used in MemoryRecords.iteration
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type) {
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
try {
switch (type) {
case NONE:
return new DataOutputStream(buffer);
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer));
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
case SNAPPY:
try {
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer);
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);

View File

@ -68,10 +68,18 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9
message throughput degradation because of the increased overhead.
Likewise, replication now transmits an additional 8 bytes per message.
If you're running close to the network capacity of your cluster, it's possible that you'll overwhelm the network cards
and see failures and performance issues due to the overload. When receiving compressed messages, 0.10.0
and see failures and performance issues due to the overload.
</p>
<b>Note:</b> If you have enabled compression on producers, you may notice reduced producer throughput and/or
lower compression rate on the broker in some cases. When receiving compressed messages, 0.10.0
brokers avoid recompressing the messages, which in general reduces the latency and improves the throughput. In
certain cases, this may reduce the batching size on the producer, which could lead to worse throughput. If this
happens, users can tune linger.ms and batch.size of the producer for better throughput.
certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this
happens, users can tune linger.ms and batch.size of the producer for better throughput. In addition, the producer buffer
used for compressing messages with snappy is smaller than the one used by the broker, which may have a negative
impact on the compression ratio for the messages on disk. We intend to make this configurable in a future Kafka
release.
<p>
</p>
<h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">Potential breaking changes in 0.10.0.0</a></h5>
@ -101,7 +109,6 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9
<ul>
<li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b> is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read <a href="#streams_overview">this section</a>.</li>
<li> If compression with snappy or gzip is enabled, the new producer will use the compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB in order to improve the compression ratio. Note that the default buffer sizes for gzip, snappy and LZ4 are 0.5 KB, 2x32 KB and 2x64KB respectively. For the snappy case, a producer with 5000 partitions will require an additional 315 MB of JVM heap.</li>
<li> The default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64K for the new consumer.</li>
<li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.</li>
<li> The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible. </li>