mirror of https://github.com/apache/kafka.git
KAFKA-3704: Remove hard-coded block size in KafkaProducer
Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Ismael Juma Closes #1371 from guozhangwang/K3565-remove-compression-blocksize
This commit is contained in:
parent
f892f0ca6d
commit
1182d61deb
|
|
@ -33,7 +33,6 @@ public class Compressor {
|
|||
|
||||
static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
|
||||
static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
|
||||
static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
|
||||
|
||||
private static final float[] TYPE_TO_RATE;
|
||||
|
||||
|
|
@ -53,7 +52,7 @@ public class Compressor {
|
|||
@Override
|
||||
public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
|
||||
return Class.forName("org.xerial.snappy.SnappyOutputStream")
|
||||
.getConstructor(OutputStream.class, Integer.TYPE);
|
||||
.getConstructor(OutputStream.class);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -91,7 +90,7 @@ public class Compressor {
|
|||
public float compressionRate;
|
||||
public long maxTimestamp;
|
||||
|
||||
public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
|
||||
public Compressor(ByteBuffer buffer, CompressionType type) {
|
||||
this.type = type;
|
||||
this.initPos = buffer.position();
|
||||
|
||||
|
|
@ -108,11 +107,7 @@ public class Compressor {
|
|||
|
||||
// create the stream
|
||||
bufferStream = new ByteBufferOutputStream(buffer);
|
||||
appendStream = wrapForOutput(bufferStream, type, blockSize);
|
||||
}
|
||||
|
||||
public Compressor(ByteBuffer buffer, CompressionType type) {
|
||||
this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
|
||||
appendStream = wrapForOutput(bufferStream, type);
|
||||
}
|
||||
|
||||
public ByteBuffer buffer() {
|
||||
|
|
@ -246,16 +241,16 @@ public class Compressor {
|
|||
|
||||
// the following two functions also need to be public since they are used in MemoryRecords.iteration
|
||||
|
||||
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
|
||||
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type) {
|
||||
try {
|
||||
switch (type) {
|
||||
case NONE:
|
||||
return new DataOutputStream(buffer);
|
||||
case GZIP:
|
||||
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
|
||||
return new DataOutputStream(new GZIPOutputStream(buffer));
|
||||
case SNAPPY:
|
||||
try {
|
||||
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
|
||||
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer);
|
||||
return new DataOutputStream(stream);
|
||||
} catch (Exception e) {
|
||||
throw new KafkaException(e);
|
||||
|
|
|
|||
|
|
@ -146,7 +146,7 @@ public final class Record {
|
|||
public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
|
||||
// construct the compressor with compression type none since this function will not do any
|
||||
//compression according to the input type, it will just write the record's payload as is
|
||||
Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
|
||||
Compressor compressor = new Compressor(buffer, CompressionType.NONE);
|
||||
compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9
|
|||
|
||||
<ul>
|
||||
<li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b> is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read <a href="#streams_overview">this section</a>.</li>
|
||||
<li> If compression with snappy or gzip is enabled, the new producer will use the compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB in order to improve the compression ratio. Note that the default buffer sizes for snappy, gzip and LZ4 are 0.5 KB, 32 KB and 64KB respectively. For the snappy case, a producer with 5000 partitions will require an additional 155 MB of JVM heap.</li>
|
||||
<li> The default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64K for the new consumer.</li>
|
||||
<li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.</li>
|
||||
<li> The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible. </li>
|
||||
|
|
|
|||
Loading…
Reference in New Issue