mirror of https://github.com/apache/kafka.git
KAFKA-17227: Refactor compression code to only load codecs when used (#16782)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Josep Prat <josep.prat@aiven.io>
This commit is contained in:
parent
c18463ec2d
commit
7c5d339d07
|
@ -76,7 +76,10 @@
|
|||
<allow pkg="net.jpountz.xxhash" />
|
||||
<allow pkg="org.xerial.snappy" />
|
||||
<allow pkg="org.apache.kafka.common.compress" />
|
||||
<allow class="org.apache.kafka.common.record.CompressionType" exact-match="true" />
|
||||
<allow class="org.apache.kafka.common.record.CompressionType" />
|
||||
<allow class="org.apache.kafka.common.record.CompressionType.GZIP" />
|
||||
<allow class="org.apache.kafka.common.record.CompressionType.LZ4" />
|
||||
<allow class="org.apache.kafka.common.record.CompressionType.ZSTD" />
|
||||
<allow class="org.apache.kafka.common.record.RecordBatch" exact-match="true" />
|
||||
</subpackage>
|
||||
|
||||
|
@ -150,6 +153,7 @@
|
|||
</subpackage>
|
||||
|
||||
<subpackage name="record">
|
||||
<allow class="org.apache.kafka.common.config.ConfigDef.Range.between" exact-match="true" />
|
||||
<allow pkg="org.apache.kafka.common.compress" />
|
||||
<allow pkg="org.apache.kafka.common.header" />
|
||||
<allow pkg="org.apache.kafka.common.record" />
|
||||
|
|
|
@ -19,9 +19,6 @@ package org.apache.kafka.clients.producer;
|
|||
import org.apache.kafka.clients.ClientDnsLookup;
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.clients.MetadataRecoveryStrategy;
|
||||
import org.apache.kafka.common.compress.GzipCompression;
|
||||
import org.apache.kafka.common.compress.Lz4Compression;
|
||||
import org.apache.kafka.common.compress.ZstdCompression;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
|
@ -381,9 +378,9 @@ public class ProducerConfig extends AbstractConfig {
|
|||
Importance.LOW,
|
||||
ACKS_DOC)
|
||||
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
|
||||
.define(COMPRESSION_GZIP_LEVEL_CONFIG, Type.INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
|
||||
.define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
|
||||
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
|
||||
.define(COMPRESSION_GZIP_LEVEL_CONFIG, Type.INT, CompressionType.GZIP.defaultLevel(), CompressionType.GZIP.levelValidator(), Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
|
||||
.define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
|
||||
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
|
||||
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
|
||||
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
|
||||
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
package org.apache.kafka.common.compress;
|
||||
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.record.CompressionType;
|
||||
import org.apache.kafka.common.utils.BufferSupplier;
|
||||
import org.apache.kafka.common.utils.ByteBufferInputStream;
|
||||
|
@ -30,14 +28,11 @@ import java.io.InputStream;
|
|||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
import java.util.zip.Deflater;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
public class GzipCompression implements Compression {
|
||||
import static org.apache.kafka.common.record.CompressionType.GZIP;
|
||||
|
||||
public static final int MIN_LEVEL = Deflater.BEST_SPEED;
|
||||
public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION;
|
||||
public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION;
|
||||
public class GzipCompression implements Compression {
|
||||
|
||||
private final int level;
|
||||
|
||||
|
@ -47,7 +42,7 @@ public class GzipCompression implements Compression {
|
|||
|
||||
@Override
|
||||
public CompressionType type() {
|
||||
return CompressionType.GZIP;
|
||||
return GZIP;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,10 +96,10 @@ public class GzipCompression implements Compression {
|
|||
}
|
||||
|
||||
public static class Builder implements Compression.Builder<GzipCompression> {
|
||||
private int level = DEFAULT_LEVEL;
|
||||
private int level = GZIP.defaultLevel();
|
||||
|
||||
public Builder level(int level) {
|
||||
if ((level < MIN_LEVEL || MAX_LEVEL < level) && level != DEFAULT_LEVEL) {
|
||||
if ((level < GZIP.minLevel() || GZIP.maxLevel() < level) && level != GZIP.defaultLevel()) {
|
||||
throw new IllegalArgumentException("gzip doesn't support given compression level: " + level);
|
||||
}
|
||||
|
||||
|
@ -117,22 +112,4 @@ public class GzipCompression implements Compression {
|
|||
return new GzipCompression(level);
|
||||
}
|
||||
}
|
||||
|
||||
public static class LevelValidator implements ConfigDef.Validator {
|
||||
|
||||
@Override
|
||||
public void ensureValid(String name, Object o) {
|
||||
if (o == null)
|
||||
throw new ConfigException(name, null, "Value must be non-null");
|
||||
int level = ((Number) o).intValue();
|
||||
if (level > MAX_LEVEL || (level < MIN_LEVEL && level != DEFAULT_LEVEL)) {
|
||||
throw new ConfigException(name, o, "Value must be between " + MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " + DEFAULT_LEVEL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.compress;
|
||||
|
||||
import org.apache.kafka.common.record.CompressionType;
|
||||
import org.apache.kafka.common.utils.ByteUtils;
|
||||
|
||||
import net.jpountz.lz4.LZ4Compressor;
|
||||
|
@ -75,7 +76,7 @@ public final class Lz4BlockOutputStream extends OutputStream {
|
|||
*
|
||||
* For backward compatibility, Lz4BlockOutputStream uses fastCompressor with default compression level but, with the other level, it uses highCompressor.
|
||||
*/
|
||||
compressor = level == Lz4Compression.DEFAULT_LEVEL ? LZ4Factory.fastestInstance().fastCompressor() : LZ4Factory.fastestInstance().highCompressor(level);
|
||||
compressor = level == CompressionType.LZ4.defaultLevel() ? LZ4Factory.fastestInstance().fastCompressor() : LZ4Factory.fastestInstance().highCompressor(level);
|
||||
checksum = XXHashFactory.fastestInstance().hash32();
|
||||
this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
|
||||
bd = new BD(blockSize);
|
||||
|
|
|
@ -28,13 +28,9 @@ import java.io.OutputStream;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
|
||||
public class Lz4Compression implements Compression {
|
||||
import static org.apache.kafka.common.record.CompressionType.LZ4;
|
||||
|
||||
// These values come from net.jpountz.lz4.LZ4Constants
|
||||
// We may need to update them if the lz4 library changes these values.
|
||||
public static final int MIN_LEVEL = 1;
|
||||
public static final int MAX_LEVEL = 17;
|
||||
public static final int DEFAULT_LEVEL = 9;
|
||||
public class Lz4Compression implements Compression {
|
||||
|
||||
private final int level;
|
||||
|
||||
|
@ -44,7 +40,7 @@ public class Lz4Compression implements Compression {
|
|||
|
||||
@Override
|
||||
public CompressionType type() {
|
||||
return CompressionType.LZ4;
|
||||
return LZ4;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -89,10 +85,10 @@ public class Lz4Compression implements Compression {
|
|||
}
|
||||
|
||||
public static class Builder implements Compression.Builder<Lz4Compression> {
|
||||
private int level = DEFAULT_LEVEL;
|
||||
private int level = LZ4.defaultLevel();
|
||||
|
||||
public Builder level(int level) {
|
||||
if (level < MIN_LEVEL || MAX_LEVEL < level) {
|
||||
if (level < LZ4.minLevel() || LZ4.maxLevel() < level) {
|
||||
throw new IllegalArgumentException("lz4 doesn't support given compression level: " + level);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.ChunkedBytesStream;
|
|||
|
||||
import com.github.luben.zstd.BufferPool;
|
||||
import com.github.luben.zstd.RecyclingBufferPool;
|
||||
import com.github.luben.zstd.Zstd;
|
||||
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
|
||||
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
|
||||
|
||||
|
@ -37,11 +36,9 @@ import java.io.OutputStream;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ZstdCompression implements Compression {
|
||||
import static org.apache.kafka.common.record.CompressionType.ZSTD;
|
||||
|
||||
public static final int MIN_LEVEL = Zstd.minCompressionLevel();
|
||||
public static final int MAX_LEVEL = Zstd.maxCompressionLevel();
|
||||
public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel();
|
||||
public class ZstdCompression implements Compression {
|
||||
|
||||
private final int level;
|
||||
|
||||
|
@ -51,7 +48,7 @@ public class ZstdCompression implements Compression {
|
|||
|
||||
@Override
|
||||
public CompressionType type() {
|
||||
return CompressionType.ZSTD;
|
||||
return ZSTD;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,10 +122,10 @@ public class ZstdCompression implements Compression {
|
|||
}
|
||||
|
||||
public static class Builder implements Compression.Builder<ZstdCompression> {
|
||||
private int level = DEFAULT_LEVEL;
|
||||
private int level = ZSTD.defaultLevel();
|
||||
|
||||
public Builder level(int level) {
|
||||
if (MAX_LEVEL < level || level < MIN_LEVEL) {
|
||||
if (level < ZSTD.minLevel() || ZSTD.maxLevel() < level) {
|
||||
throw new IllegalArgumentException("zstd doesn't support given compression level: " + level);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,13 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
|
||||
import java.util.zip.Deflater;
|
||||
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.between;
|
||||
|
||||
/**
|
||||
* The compression type to use
|
||||
*/
|
||||
|
@ -23,7 +30,46 @@ public enum CompressionType {
|
|||
NONE((byte) 0, "none", 1.0f),
|
||||
|
||||
// Shipped with the JDK
|
||||
GZIP((byte) 1, "gzip", 1.0f),
|
||||
GZIP((byte) 1, "gzip", 1.0f) {
|
||||
public static final int MIN_LEVEL = Deflater.BEST_SPEED;
|
||||
public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION;
|
||||
public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION;
|
||||
|
||||
@Override
|
||||
public int defaultLevel() {
|
||||
return DEFAULT_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxLevel() {
|
||||
return MAX_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int minLevel() {
|
||||
return MIN_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigDef.Validator levelValidator() {
|
||||
return new ConfigDef.Validator() {
|
||||
@Override
|
||||
public void ensureValid(String name, Object o) {
|
||||
if (o == null)
|
||||
throw new ConfigException(name, null, "Value must be non-null");
|
||||
int level = ((Number) o).intValue();
|
||||
if (level > MAX_LEVEL || (level < MIN_LEVEL && level != DEFAULT_LEVEL)) {
|
||||
throw new ConfigException(name, o, "Value must be between " + MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " + DEFAULT_LEVEL;
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
|
||||
// We should only load classes from a given compression library when we actually use said compression library. This
|
||||
// is because compression libraries include native code for a set of platforms and we want to avoid errors
|
||||
|
@ -31,8 +77,65 @@ public enum CompressionType {
|
|||
// To ensure this, we only reference compression library code from classes that are only invoked when actual usage
|
||||
// happens.
|
||||
SNAPPY((byte) 2, "snappy", 1.0f),
|
||||
LZ4((byte) 3, "lz4", 1.0f),
|
||||
ZSTD((byte) 4, "zstd", 1.0f);
|
||||
LZ4((byte) 3, "lz4", 1.0f) {
|
||||
// These values come from net.jpountz.lz4.LZ4Constants
|
||||
// We may need to update them if the lz4 library changes these values.
|
||||
private static final int MIN_LEVEL = 1;
|
||||
private static final int MAX_LEVEL = 17;
|
||||
private static final int DEFAULT_LEVEL = 9;
|
||||
|
||||
@Override
|
||||
public int defaultLevel() {
|
||||
return DEFAULT_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxLevel() {
|
||||
return MAX_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int minLevel() {
|
||||
return MIN_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigDef.Validator levelValidator() {
|
||||
return between(MIN_LEVEL, MAX_LEVEL);
|
||||
}
|
||||
},
|
||||
ZSTD((byte) 4, "zstd", 1.0f) {
|
||||
// These values come from the zstd library. We don't use the Zstd.minCompressionLevel(),
|
||||
// Zstd.maxCompressionLevel() and Zstd.defaultCompressionLevel() methods to not load the Zstd library
|
||||
// while parsing configuration.
|
||||
// See ZSTD_minCLevel in https://github.com/facebook/zstd/blob/dev/lib/compress/zstd_compress.c#L6987
|
||||
// and ZSTD_TARGETLENGTH_MAX https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L1249
|
||||
private static final int MIN_LEVEL = -131072;
|
||||
// See ZSTD_MAX_CLEVEL in https://github.com/facebook/zstd/blob/dev/lib/compress/clevels.h#L19
|
||||
private static final int MAX_LEVEL = 22;
|
||||
// See ZSTD_CLEVEL_DEFAULT in https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L129
|
||||
private static final int DEFAULT_LEVEL = 3;
|
||||
|
||||
@Override
|
||||
public int defaultLevel() {
|
||||
return DEFAULT_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxLevel() {
|
||||
return MAX_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int minLevel() {
|
||||
return MIN_LEVEL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigDef.Validator levelValidator() {
|
||||
return between(MIN_LEVEL, MAX_LEVEL);
|
||||
}
|
||||
};
|
||||
|
||||
// compression type is represented by two bits in the attributes field of the record batch header, so `byte` is
|
||||
// large enough
|
||||
|
@ -78,6 +181,22 @@ public enum CompressionType {
|
|||
throw new IllegalArgumentException("Unknown compression name: " + name);
|
||||
}
|
||||
|
||||
public int defaultLevel() {
|
||||
throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name);
|
||||
}
|
||||
|
||||
public int maxLevel() {
|
||||
throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name);
|
||||
}
|
||||
|
||||
public int minLevel() {
|
||||
throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name);
|
||||
}
|
||||
|
||||
public ConfigDef.Validator levelValidator() {
|
||||
throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.compress;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.common.utils.BufferSupplier;
|
||||
|
@ -30,6 +31,7 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.apache.kafka.common.record.CompressionType.GZIP;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -42,7 +44,7 @@ public class GzipCompressionTest {
|
|||
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
|
||||
for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
|
||||
for (int level : Arrays.asList(GZIP.minLevel(), GZIP.defaultLevel(), GZIP.maxLevel())) {
|
||||
GzipCompression compression = builder.level(level).build();
|
||||
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
|
||||
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
|
||||
|
@ -65,21 +67,21 @@ public class GzipCompressionTest {
|
|||
public void testCompressionLevels() {
|
||||
GzipCompression.Builder builder = Compression.gzip();
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MIN_LEVEL - 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MAX_LEVEL + 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(GZIP.minLevel() - 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(GZIP.maxLevel() + 1));
|
||||
|
||||
builder.level(GzipCompression.MIN_LEVEL);
|
||||
builder.level(GzipCompression.MAX_LEVEL);
|
||||
builder.level(GZIP.minLevel());
|
||||
builder.level(GZIP.maxLevel());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLevelValidator() {
|
||||
GzipCompression.LevelValidator validator = new GzipCompression.LevelValidator();
|
||||
for (int level = GzipCompression.MIN_LEVEL; level <= GzipCompression.MAX_LEVEL; level++) {
|
||||
ConfigDef.Validator validator = GZIP.levelValidator();
|
||||
for (int level = GZIP.minLevel(); level <= GZIP.maxLevel(); level++) {
|
||||
validator.ensureValid("", level);
|
||||
}
|
||||
validator.ensureValid("", GzipCompression.DEFAULT_LEVEL);
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("", GzipCompression.MIN_LEVEL - 1));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("", GzipCompression.MAX_LEVEL + 1));
|
||||
validator.ensureValid("", GZIP.defaultLevel());
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("", GZIP.minLevel() - 1));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("", GZIP.maxLevel() + 1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.util.Random;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.common.compress.Lz4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
|
||||
import static org.apache.kafka.common.record.CompressionType.LZ4;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -90,7 +91,7 @@ public class Lz4CompressionTest {
|
|||
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
|
||||
for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL, Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL)) {
|
||||
for (int level : Arrays.asList(LZ4.minLevel(), LZ4.defaultLevel(), LZ4.maxLevel())) {
|
||||
Lz4Compression compression = builder.level(level).build();
|
||||
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
|
||||
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
|
||||
|
@ -113,11 +114,11 @@ public class Lz4CompressionTest {
|
|||
public void testCompressionLevels() {
|
||||
Lz4Compression.Builder builder = Compression.lz4();
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(Lz4Compression.MIN_LEVEL - 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(Lz4Compression.MAX_LEVEL + 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(LZ4.minLevel() - 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(LZ4.maxLevel() + 1));
|
||||
|
||||
builder.level(Lz4Compression.MIN_LEVEL);
|
||||
builder.level(Lz4Compression.MAX_LEVEL);
|
||||
builder.level(LZ4.minLevel());
|
||||
builder.level(LZ4.maxLevel());
|
||||
}
|
||||
|
||||
private static class Payload {
|
||||
|
@ -192,7 +193,7 @@ public class Lz4CompressionTest {
|
|||
for (boolean ignore : Arrays.asList(false, true))
|
||||
for (boolean blockChecksum : Arrays.asList(false, true))
|
||||
for (boolean close : Arrays.asList(false, true))
|
||||
for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL, Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL))
|
||||
for (int level : Arrays.asList(LZ4.minLevel(), LZ4.defaultLevel(), LZ4.maxLevel()))
|
||||
arguments.add(Arguments.of(new Args(broken, ignore, level, blockChecksum, close, payload)));
|
||||
|
||||
return arguments.stream();
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.apache.kafka.common.record.CompressionType.ZSTD;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -41,7 +42,7 @@ public class ZstdCompressionTest {
|
|||
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
|
||||
for (int level : Arrays.asList(ZstdCompression.MIN_LEVEL, ZstdCompression.DEFAULT_LEVEL, ZstdCompression.MAX_LEVEL)) {
|
||||
for (int level : Arrays.asList(ZSTD.minLevel(), ZSTD.defaultLevel(), ZSTD.maxLevel())) {
|
||||
ZstdCompression compression = builder.level(level).build();
|
||||
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
|
||||
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
|
||||
|
@ -64,10 +65,10 @@ public class ZstdCompressionTest {
|
|||
public void testCompressionLevels() {
|
||||
ZstdCompression.Builder builder = Compression.zstd();
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(ZstdCompression.MIN_LEVEL - 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(ZstdCompression.MAX_LEVEL + 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(ZSTD.minLevel() - 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> builder.level(ZSTD.maxLevel() + 1));
|
||||
|
||||
builder.level(ZstdCompression.MIN_LEVEL);
|
||||
builder.level(ZstdCompression.MAX_LEVEL);
|
||||
builder.level(ZSTD.minLevel());
|
||||
builder.level(ZSTD.maxLevel());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.nio.ByteBuffer
|
|||
import java.util.concurrent.TimeUnit
|
||||
import kafka.server.{BrokerTopicStats, RequestLocal}
|
||||
import kafka.utils.TestUtils.meterCount
|
||||
import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression}
|
||||
import org.apache.kafka.common.compress.Compression
|
||||
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
|
||||
import org.apache.kafka.common.record._
|
||||
import org.apache.kafka.common.utils.{PrimitiveRef, Time}
|
||||
|
@ -1620,11 +1620,11 @@ class LogValidatorTest {
|
|||
List.fill(256)("data").mkString("").getBytes
|
||||
)
|
||||
// Records from the producer were created with gzip max level
|
||||
val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
|
||||
val gzipMax: Compression = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build()
|
||||
val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax)
|
||||
|
||||
// The topic is configured with gzip min level
|
||||
val gzipMin: Compression = Compression.gzip().level(GzipCompression.MIN_LEVEL).build()
|
||||
val gzipMin: Compression = Compression.gzip().level(CompressionType.GZIP.minLevel()).build()
|
||||
val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin)
|
||||
|
||||
// ensure data compressed with gzip max and min is different
|
||||
|
@ -1658,11 +1658,11 @@ class LogValidatorTest {
|
|||
List.fill(256)("data").mkString("").getBytes
|
||||
)
|
||||
// Records from the producer were created with gzip max level
|
||||
val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
|
||||
val gzipMax: Compression = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build()
|
||||
val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax)
|
||||
|
||||
// The topic is configured with lz4 min level
|
||||
val lz4Min: Compression = Compression.lz4().level(Lz4Compression.MIN_LEVEL).build()
|
||||
val lz4Min: Compression = Compression.lz4().level(CompressionType.LZ4.minLevel()).build()
|
||||
val recordsLz4Min = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, lz4Min)
|
||||
|
||||
val validator = new LogValidator(recordsGzipMax,
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.Sensor
|
|||
import org.apache.kafka.common.network.ListenerName
|
||||
import org.apache.kafka.common.record.{CompressionType, Records}
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.common.compress.{GzipCompression, Lz4Compression, ZstdCompression}
|
||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
|
||||
import org.apache.kafka.coordinator.group.Group.GroupType
|
||||
|
@ -769,7 +768,7 @@ class KafkaConfigTest {
|
|||
def testInvalidGzipCompressionLevel(): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
||||
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
|
||||
props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, (GzipCompression.MAX_LEVEL + 1).toString)
|
||||
props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, (CompressionType.GZIP.maxLevel() + 1).toString)
|
||||
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
|
||||
}
|
||||
|
||||
|
@ -777,7 +776,7 @@ class KafkaConfigTest {
|
|||
def testInvalidLz4CompressionLevel(): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
||||
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "lz4")
|
||||
props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, (Lz4Compression.MAX_LEVEL + 1).toString)
|
||||
props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, (CompressionType.LZ4.maxLevel() + 1).toString)
|
||||
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
|
||||
}
|
||||
|
||||
|
@ -785,7 +784,7 @@ class KafkaConfigTest {
|
|||
def testInvalidZstdCompressionLevel(): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
|
||||
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "zstd")
|
||||
props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, (ZstdCompression.MAX_LEVEL + 1).toString)
|
||||
props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, (CompressionType.ZSTD.maxLevel() + 1).toString)
|
||||
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
|
||||
}
|
||||
|
||||
|
@ -999,7 +998,7 @@ class KafkaConfigTest {
|
|||
|
||||
case ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", ZstdCompression.MAX_LEVEL + 1)
|
||||
case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", CompressionType.ZSTD.maxLevel() + 1)
|
||||
|
||||
//SSL Configs
|
||||
case BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG =>
|
||||
|
|
|
@ -136,6 +136,7 @@ versions += [
|
|||
kafka_36: "3.6.2",
|
||||
kafka_37: "3.7.1",
|
||||
kafka_38: "3.8.0",
|
||||
// When updating lz4 make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
|
||||
lz4: "1.8.0",
|
||||
mavenArtifact: "3.9.6",
|
||||
metrics: "2.2.0",
|
||||
|
@ -158,6 +159,7 @@ versions += [
|
|||
zinc: "1.9.2",
|
||||
zookeeper: "3.8.4",
|
||||
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
|
||||
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
|
||||
zstd: "1.5.6-4",
|
||||
junitPlatform: "1.10.2"
|
||||
]
|
||||
|
|
|
@ -33,13 +33,13 @@ public class BrokerCompressionTypeTest {
|
|||
|
||||
@Test
|
||||
public void testTargetCompressionType() {
|
||||
GzipCompression gzipWithLevel = Compression.gzip().level(GzipCompression.MAX_LEVEL).build();
|
||||
GzipCompression gzipWithLevel = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build();
|
||||
assertEquals(gzipWithLevel, BrokerCompressionType.targetCompression(Optional.of(gzipWithLevel), CompressionType.ZSTD));
|
||||
SnappyCompression snappy = Compression.snappy().build();
|
||||
assertEquals(snappy, BrokerCompressionType.targetCompression(Optional.of(snappy), CompressionType.LZ4));
|
||||
Lz4Compression lz4WithLevel = Compression.lz4().level(Lz4Compression.MAX_LEVEL).build();
|
||||
Lz4Compression lz4WithLevel = Compression.lz4().level(CompressionType.LZ4.maxLevel()).build();
|
||||
assertEquals(lz4WithLevel, BrokerCompressionType.targetCompression(Optional.of(lz4WithLevel), CompressionType.ZSTD));
|
||||
ZstdCompression zstdWithLevel = Compression.zstd().level(ZstdCompression.MAX_LEVEL).build();
|
||||
ZstdCompression zstdWithLevel = Compression.zstd().level(CompressionType.ZSTD.maxLevel()).build();
|
||||
assertEquals(zstdWithLevel, BrokerCompressionType.targetCompression(Optional.of(zstdWithLevel), CompressionType.GZIP));
|
||||
|
||||
GzipCompression gzip = Compression.gzip().build();
|
||||
|
|
|
@ -17,11 +17,9 @@
|
|||
package org.apache.kafka.server.config;
|
||||
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.common.compress.GzipCompression;
|
||||
import org.apache.kafka.common.compress.Lz4Compression;
|
||||
import org.apache.kafka.common.compress.ZstdCompression;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
import org.apache.kafka.common.record.CompressionType;
|
||||
import org.apache.kafka.server.authorizer.Authorizer;
|
||||
import org.apache.kafka.server.record.BrokerCompressionType;
|
||||
import org.apache.kafka.storage.internals.log.LogConfig;
|
||||
|
@ -30,7 +28,6 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
|
|||
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.between;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
|
||||
|
@ -163,9 +160,9 @@ public class ServerConfigs {
|
|||
.define(CONTROLLED_SHUTDOWN_ENABLE_CONFIG, BOOLEAN, CONTROLLED_SHUTDOWN_ENABLE_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_ENABLE_DOC)
|
||||
.define(DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN, DELETE_TOPIC_ENABLE_DEFAULT, HIGH, DELETE_TOPIC_ENABLE_DOC)
|
||||
.define(COMPRESSION_TYPE_CONFIG, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, ConfigDef.ValidString.in(BrokerCompressionType.names().toArray(new String[0])), HIGH, COMPRESSION_TYPE_DOC)
|
||||
.define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
|
||||
.define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
|
||||
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
|
||||
.define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, CompressionType.GZIP.defaultLevel(), CompressionType.GZIP.levelValidator(), MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
|
||||
.define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
|
||||
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, INT, CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
|
||||
/** ********* Fetch Configuration **************/
|
||||
.define(MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, INT, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DEFAULT, atLeast(0), MEDIUM, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DOC)
|
||||
.define(FETCH_MAX_BYTES_CONFIG, INT, FETCH_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM, FETCH_MAX_BYTES_DOC)
|
||||
|
|
|
@ -17,9 +17,6 @@
|
|||
package org.apache.kafka.storage.internals.log;
|
||||
|
||||
import org.apache.kafka.common.compress.Compression;
|
||||
import org.apache.kafka.common.compress.GzipCompression;
|
||||
import org.apache.kafka.common.compress.Lz4Compression;
|
||||
import org.apache.kafka.common.compress.ZstdCompression;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
|
||||
|
@ -28,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.ValidList;
|
|||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
||||
import org.apache.kafka.common.record.CompressionType;
|
||||
import org.apache.kafka.common.record.LegacyRecord;
|
||||
import org.apache.kafka.common.record.RecordVersion;
|
||||
import org.apache.kafka.common.record.Records;
|
||||
|
@ -300,12 +298,12 @@ public class LogConfig extends AbstractConfig {
|
|||
TopicConfig.MIN_IN_SYNC_REPLICAS_DOC)
|
||||
.define(TopicConfig.COMPRESSION_TYPE_CONFIG, STRING, DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names().toArray(new String[0])),
|
||||
MEDIUM, TopicConfig.COMPRESSION_TYPE_DOC)
|
||||
.define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT, GzipCompression.DEFAULT_LEVEL,
|
||||
new GzipCompression.LevelValidator(), MEDIUM, TopicConfig.COMPRESSION_GZIP_LEVEL_DOC)
|
||||
.define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT, Lz4Compression.DEFAULT_LEVEL,
|
||||
between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), MEDIUM, TopicConfig.COMPRESSION_LZ4_LEVEL_DOC)
|
||||
.define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, ZstdCompression.DEFAULT_LEVEL,
|
||||
between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC)
|
||||
.define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT, CompressionType.GZIP.defaultLevel(),
|
||||
CompressionType.GZIP.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_GZIP_LEVEL_DOC)
|
||||
.define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT, CompressionType.LZ4.defaultLevel(),
|
||||
CompressionType.LZ4.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_LZ4_LEVEL_DOC)
|
||||
.define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, CompressionType.ZSTD.defaultLevel(),
|
||||
CompressionType.ZSTD.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC)
|
||||
.define(TopicConfig.PREALLOCATE_CONFIG, BOOLEAN, DEFAULT_PREALLOCATE, MEDIUM, TopicConfig.PREALLOCATE_DOC)
|
||||
.define(MESSAGE_FORMAT_VERSION_CONFIG, STRING, DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM,
|
||||
MESSAGE_FORMAT_VERSION_DOC)
|
||||
|
|
Loading…
Reference in New Issue