diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3f8212f9976..abf98458005 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -76,7 +76,10 @@ - + + + + @@ -150,6 +153,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index acfb760fbab..8b293395313 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -19,9 +19,6 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataRecoveryStrategy; -import org.apache.kafka.common.compress.GzipCompression; -import org.apache.kafka.common.compress.Lz4Compression; -import org.apache.kafka.common.compress.ZstdCompression; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -381,9 +378,9 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC) - .define(COMPRESSION_GZIP_LEVEL_CONFIG, Type.INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC) - .define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC) - .define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC) + .define(COMPRESSION_GZIP_LEVEL_CONFIG, Type.INT, CompressionType.GZIP.defaultLevel(), CompressionType.GZIP.levelValidator(), Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC) + .define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC) + .define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC) .define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java b/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java index 52e38700c46..e900359530a 100644 --- a/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java @@ -17,8 +17,6 @@ package org.apache.kafka.common.compress; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferInputStream; @@ -30,14 +28,11 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; -import java.util.zip.Deflater; import java.util.zip.GZIPInputStream; -public class GzipCompression implements Compression { +import static org.apache.kafka.common.record.CompressionType.GZIP; - public static final int MIN_LEVEL = Deflater.BEST_SPEED; - public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION; - public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION; +public class GzipCompression implements Compression { private final int level; @@ -47,7 +42,7 @@ public class GzipCompression implements Compression { @Override public CompressionType type() { - return CompressionType.GZIP; + return GZIP; } @Override @@ -101,10 +96,10 @@ public class GzipCompression implements Compression { } public static class Builder implements Compression.Builder { - private int level = DEFAULT_LEVEL; + private int level = GZIP.defaultLevel(); public Builder level(int level) { - if ((level < MIN_LEVEL || MAX_LEVEL < level) && level != DEFAULT_LEVEL) { + if ((level < GZIP.minLevel() || GZIP.maxLevel() < level) && level != GZIP.defaultLevel()) { throw new IllegalArgumentException("gzip doesn't support given compression level: " + level); } @@ -117,22 +112,4 @@ public class GzipCompression implements Compression { return new GzipCompression(level); } } - - public static class LevelValidator implements ConfigDef.Validator { - - @Override - public void ensureValid(String name, Object o) { - if (o == null) - throw new ConfigException(name, null, "Value must be non-null"); - int level = ((Number) o).intValue(); - if (level > MAX_LEVEL || (level < MIN_LEVEL && level != DEFAULT_LEVEL)) { - throw new ConfigException(name, o, "Value must be between " + MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL); - } - } - - @Override - public String toString() { - return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " + DEFAULT_LEVEL; - } - } } diff --git a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java index e55b84e286c..10a0a81b1d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.compress; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.ByteUtils; import net.jpountz.lz4.LZ4Compressor; @@ -75,7 +76,7 @@ public final class Lz4BlockOutputStream extends OutputStream { * * For backward compatibility, Lz4BlockOutputStream uses fastCompressor with default compression level but, with the other level, it uses highCompressor. */ - compressor = level == Lz4Compression.DEFAULT_LEVEL ? LZ4Factory.fastestInstance().fastCompressor() : LZ4Factory.fastestInstance().highCompressor(level); + compressor = level == CompressionType.LZ4.defaultLevel() ? LZ4Factory.fastestInstance().fastCompressor() : LZ4Factory.fastestInstance().highCompressor(level); checksum = XXHashFactory.fastestInstance().hash32(); this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; bd = new BD(blockSize); diff --git a/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java b/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java index 42c1a1a1417..f3e80447a14 100644 --- a/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java @@ -28,13 +28,9 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; -public class Lz4Compression implements Compression { +import static org.apache.kafka.common.record.CompressionType.LZ4; - // These values come from net.jpountz.lz4.LZ4Constants - // We may need to update them if the lz4 library changes these values. - public static final int MIN_LEVEL = 1; - public static final int MAX_LEVEL = 17; - public static final int DEFAULT_LEVEL = 9; +public class Lz4Compression implements Compression { private final int level; @@ -44,7 +40,7 @@ public class Lz4Compression implements Compression { @Override public CompressionType type() { - return CompressionType.LZ4; + return LZ4; } @Override @@ -89,10 +85,10 @@ public class Lz4Compression implements Compression { } public static class Builder implements Compression.Builder { - private int level = DEFAULT_LEVEL; + private int level = LZ4.defaultLevel(); public Builder level(int level) { - if (level < MIN_LEVEL || MAX_LEVEL < level) { + if (level < LZ4.minLevel() || LZ4.maxLevel() < level) { throw new IllegalArgumentException("lz4 doesn't support given compression level: " + level); } diff --git a/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java b/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java index 4ddf23e78de..728797e6a82 100644 --- a/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.ChunkedBytesStream; import com.github.luben.zstd.BufferPool; import com.github.luben.zstd.RecyclingBufferPool; -import com.github.luben.zstd.Zstd; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; @@ -37,11 +36,9 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; -public class ZstdCompression implements Compression { +import static org.apache.kafka.common.record.CompressionType.ZSTD; - public static final int MIN_LEVEL = Zstd.minCompressionLevel(); - public static final int MAX_LEVEL = Zstd.maxCompressionLevel(); - public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel(); +public class ZstdCompression implements Compression { private final int level; @@ -51,7 +48,7 @@ public class ZstdCompression implements Compression { @Override public CompressionType type() { - return CompressionType.ZSTD; + return ZSTD; } @Override @@ -125,10 +122,10 @@ public class ZstdCompression implements Compression { } public static class Builder implements Compression.Builder { - private int level = DEFAULT_LEVEL; + private int level = ZSTD.defaultLevel(); public Builder level(int level) { - if (MAX_LEVEL < level || level < MIN_LEVEL) { + if (level < ZSTD.minLevel() || ZSTD.maxLevel() < level) { throw new IllegalArgumentException("zstd doesn't support given compression level: " + level); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 12efafc8b55..05b8c45358b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -16,6 +16,13 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.zip.Deflater; + +import static org.apache.kafka.common.config.ConfigDef.Range.between; + /** * The compression type to use */ @@ -23,7 +30,46 @@ public enum CompressionType { NONE((byte) 0, "none", 1.0f), // Shipped with the JDK - GZIP((byte) 1, "gzip", 1.0f), + GZIP((byte) 1, "gzip", 1.0f) { + public static final int MIN_LEVEL = Deflater.BEST_SPEED; + public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION; + public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION; + + @Override + public int defaultLevel() { + return DEFAULT_LEVEL; + } + + @Override + public int maxLevel() { + return MAX_LEVEL; + } + + @Override + public int minLevel() { + return MIN_LEVEL; + } + + @Override + public ConfigDef.Validator levelValidator() { + return new ConfigDef.Validator() { + @Override + public void ensureValid(String name, Object o) { + if (o == null) + throw new ConfigException(name, null, "Value must be non-null"); + int level = ((Number) o).intValue(); + if (level > MAX_LEVEL || (level < MIN_LEVEL && level != DEFAULT_LEVEL)) { + throw new ConfigException(name, o, "Value must be between " + MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL); + } + } + + @Override + public String toString() { + return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " + DEFAULT_LEVEL; + } + }; + } + }, // We should only load classes from a given compression library when we actually use said compression library. This // is because compression libraries include native code for a set of platforms and we want to avoid errors @@ -31,8 +77,65 @@ public enum CompressionType { // To ensure this, we only reference compression library code from classes that are only invoked when actual usage // happens. SNAPPY((byte) 2, "snappy", 1.0f), - LZ4((byte) 3, "lz4", 1.0f), - ZSTD((byte) 4, "zstd", 1.0f); + LZ4((byte) 3, "lz4", 1.0f) { + // These values come from net.jpountz.lz4.LZ4Constants + // We may need to update them if the lz4 library changes these values. + private static final int MIN_LEVEL = 1; + private static final int MAX_LEVEL = 17; + private static final int DEFAULT_LEVEL = 9; + + @Override + public int defaultLevel() { + return DEFAULT_LEVEL; + } + + @Override + public int maxLevel() { + return MAX_LEVEL; + } + + @Override + public int minLevel() { + return MIN_LEVEL; + } + + @Override + public ConfigDef.Validator levelValidator() { + return between(MIN_LEVEL, MAX_LEVEL); + } + }, + ZSTD((byte) 4, "zstd", 1.0f) { + // These values come from the zstd library. We don't use the Zstd.minCompressionLevel(), + // Zstd.maxCompressionLevel() and Zstd.defaultCompressionLevel() methods to not load the Zstd library + // while parsing configuration. + // See ZSTD_minCLevel in https://github.com/facebook/zstd/blob/dev/lib/compress/zstd_compress.c#L6987 + // and ZSTD_TARGETLENGTH_MAX https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L1249 + private static final int MIN_LEVEL = -131072; + // See ZSTD_MAX_CLEVEL in https://github.com/facebook/zstd/blob/dev/lib/compress/clevels.h#L19 + private static final int MAX_LEVEL = 22; + // See ZSTD_CLEVEL_DEFAULT in https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L129 + private static final int DEFAULT_LEVEL = 3; + + @Override + public int defaultLevel() { + return DEFAULT_LEVEL; + } + + @Override + public int maxLevel() { + return MAX_LEVEL; + } + + @Override + public int minLevel() { + return MIN_LEVEL; + } + + @Override + public ConfigDef.Validator levelValidator() { + return between(MIN_LEVEL, MAX_LEVEL); + } + }; // compression type is represented by two bits in the attributes field of the record batch header, so `byte` is // large enough @@ -78,6 +181,22 @@ public enum CompressionType { throw new IllegalArgumentException("Unknown compression name: " + name); } + public int defaultLevel() { + throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name); + } + + public int maxLevel() { + throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name); + } + + public int minLevel() { + throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name); + } + + public ConfigDef.Validator levelValidator() { + throw new UnsupportedOperationException("Compression levels are not defined for this compression type: " + name); + } + @Override public String toString() { return name; diff --git a/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java b/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java index 2837d96a5d0..660ba02a877 100644 --- a/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.compress; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.BufferSupplier; @@ -30,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import static org.apache.kafka.common.record.CompressionType.GZIP; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -42,7 +44,7 @@ public class GzipCompressionTest { byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8); for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { - for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) { + for (int level : Arrays.asList(GZIP.minLevel(), GZIP.defaultLevel(), GZIP.maxLevel())) { GzipCompression compression = builder.level(level).build(); ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) { @@ -65,21 +67,21 @@ public class GzipCompressionTest { public void testCompressionLevels() { GzipCompression.Builder builder = Compression.gzip(); - assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MIN_LEVEL - 1)); - assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MAX_LEVEL + 1)); + assertThrows(IllegalArgumentException.class, () -> builder.level(GZIP.minLevel() - 1)); + assertThrows(IllegalArgumentException.class, () -> builder.level(GZIP.maxLevel() + 1)); - builder.level(GzipCompression.MIN_LEVEL); - builder.level(GzipCompression.MAX_LEVEL); + builder.level(GZIP.minLevel()); + builder.level(GZIP.maxLevel()); } @Test public void testLevelValidator() { - GzipCompression.LevelValidator validator = new GzipCompression.LevelValidator(); - for (int level = GzipCompression.MIN_LEVEL; level <= GzipCompression.MAX_LEVEL; level++) { + ConfigDef.Validator validator = GZIP.levelValidator(); + for (int level = GZIP.minLevel(); level <= GZIP.maxLevel(); level++) { validator.ensureValid("", level); } - validator.ensureValid("", GzipCompression.DEFAULT_LEVEL); - assertThrows(ConfigException.class, () -> validator.ensureValid("", GzipCompression.MIN_LEVEL - 1)); - assertThrows(ConfigException.class, () -> validator.ensureValid("", GzipCompression.MAX_LEVEL + 1)); + validator.ensureValid("", GZIP.defaultLevel()); + assertThrows(ConfigException.class, () -> validator.ensureValid("", GZIP.minLevel() - 1)); + assertThrows(ConfigException.class, () -> validator.ensureValid("", GZIP.maxLevel() + 1)); } } diff --git a/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java b/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java index a4f1152830d..e0a1a7aab94 100644 --- a/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java @@ -45,6 +45,7 @@ import java.util.Random; import java.util.stream.Stream; import static org.apache.kafka.common.compress.Lz4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.record.CompressionType.LZ4; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -90,7 +91,7 @@ public class Lz4CompressionTest { byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8); for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { - for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL, Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL)) { + for (int level : Arrays.asList(LZ4.minLevel(), LZ4.defaultLevel(), LZ4.maxLevel())) { Lz4Compression compression = builder.level(level).build(); ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) { @@ -113,11 +114,11 @@ public class Lz4CompressionTest { public void testCompressionLevels() { Lz4Compression.Builder builder = Compression.lz4(); - assertThrows(IllegalArgumentException.class, () -> builder.level(Lz4Compression.MIN_LEVEL - 1)); - assertThrows(IllegalArgumentException.class, () -> builder.level(Lz4Compression.MAX_LEVEL + 1)); + assertThrows(IllegalArgumentException.class, () -> builder.level(LZ4.minLevel() - 1)); + assertThrows(IllegalArgumentException.class, () -> builder.level(LZ4.maxLevel() + 1)); - builder.level(Lz4Compression.MIN_LEVEL); - builder.level(Lz4Compression.MAX_LEVEL); + builder.level(LZ4.minLevel()); + builder.level(LZ4.maxLevel()); } private static class Payload { @@ -192,7 +193,7 @@ public class Lz4CompressionTest { for (boolean ignore : Arrays.asList(false, true)) for (boolean blockChecksum : Arrays.asList(false, true)) for (boolean close : Arrays.asList(false, true)) - for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL, Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL)) + for (int level : Arrays.asList(LZ4.minLevel(), LZ4.defaultLevel(), LZ4.maxLevel())) arguments.add(Arguments.of(new Args(broken, ignore, level, blockChecksum, close, payload))); return arguments.stream(); diff --git a/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java b/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java index 176291b8c56..30ace681b08 100644 --- a/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java @@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import static org.apache.kafka.common.record.CompressionType.ZSTD; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -41,7 +42,7 @@ public class ZstdCompressionTest { byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8); for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { - for (int level : Arrays.asList(ZstdCompression.MIN_LEVEL, ZstdCompression.DEFAULT_LEVEL, ZstdCompression.MAX_LEVEL)) { + for (int level : Arrays.asList(ZSTD.minLevel(), ZSTD.defaultLevel(), ZSTD.maxLevel())) { ZstdCompression compression = builder.level(level).build(); ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) { @@ -64,10 +65,10 @@ public class ZstdCompressionTest { public void testCompressionLevels() { ZstdCompression.Builder builder = Compression.zstd(); - assertThrows(IllegalArgumentException.class, () -> builder.level(ZstdCompression.MIN_LEVEL - 1)); - assertThrows(IllegalArgumentException.class, () -> builder.level(ZstdCompression.MAX_LEVEL + 1)); + assertThrows(IllegalArgumentException.class, () -> builder.level(ZSTD.minLevel() - 1)); + assertThrows(IllegalArgumentException.class, () -> builder.level(ZSTD.maxLevel() + 1)); - builder.level(ZstdCompression.MIN_LEVEL); - builder.level(ZstdCompression.MAX_LEVEL); + builder.level(ZSTD.minLevel()); + builder.level(ZSTD.maxLevel()); } } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index ef80d5167a3..ea2349059b0 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -20,7 +20,7 @@ import java.nio.ByteBuffer import java.util.concurrent.TimeUnit import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.TestUtils.meterCount -import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression} +import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{PrimitiveRef, Time} @@ -1620,11 +1620,11 @@ class LogValidatorTest { List.fill(256)("data").mkString("").getBytes ) // Records from the producer were created with gzip max level - val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build() + val gzipMax: Compression = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build() val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) // The topic is configured with gzip min level - val gzipMin: Compression = Compression.gzip().level(GzipCompression.MIN_LEVEL).build() + val gzipMin: Compression = Compression.gzip().level(CompressionType.GZIP.minLevel()).build() val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin) // ensure data compressed with gzip max and min is different @@ -1658,11 +1658,11 @@ class LogValidatorTest { List.fill(256)("data").mkString("").getBytes ) // Records from the producer were created with gzip max level - val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build() + val gzipMax: Compression = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build() val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) // The topic is configured with lz4 min level - val lz4Min: Compression = Compression.lz4().level(Lz4Compression.MIN_LEVEL).build() + val lz4Min: Compression = Compression.lz4().level(CompressionType.LZ4.minLevel()).build() val recordsLz4Min = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, lz4Min) val validator = new LogValidator(recordsGzipMax, diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 6a8d4a6564a..20555da96f6 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.{CompressionType, Records} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.compress.{GzipCompression, Lz4Compression, ZstdCompression} import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType @@ -769,7 +768,7 @@ class KafkaConfigTest { def testInvalidGzipCompressionLevel(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip") - props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, (GzipCompression.MAX_LEVEL + 1).toString) + props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, (CompressionType.GZIP.maxLevel() + 1).toString) assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) } @@ -777,7 +776,7 @@ class KafkaConfigTest { def testInvalidLz4CompressionLevel(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "lz4") - props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, (Lz4Compression.MAX_LEVEL + 1).toString) + props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, (CompressionType.LZ4.maxLevel() + 1).toString) assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) } @@ -785,7 +784,7 @@ class KafkaConfigTest { def testInvalidZstdCompressionLevel(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "zstd") - props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, (ZstdCompression.MAX_LEVEL + 1).toString) + props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, (CompressionType.ZSTD.maxLevel() + 1).toString) assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) } @@ -999,7 +998,7 @@ class KafkaConfigTest { case ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") - case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", ZstdCompression.MAX_LEVEL + 1) + case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", CompressionType.ZSTD.maxLevel() + 1) //SSL Configs case BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG => diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index c67167731e7..8ca49dd674b 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -136,6 +136,7 @@ versions += [ kafka_36: "3.6.2", kafka_37: "3.7.1", kafka_38: "3.8.0", + // When updating lz4 make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid lz4: "1.8.0", mavenArtifact: "3.9.6", metrics: "2.2.0", @@ -158,6 +159,7 @@ versions += [ zinc: "1.9.2", zookeeper: "3.8.4", // When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json + // Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid zstd: "1.5.6-4", junitPlatform: "1.10.2" ] diff --git a/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java b/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java index ae6c3d76e3a..96f8a09d33a 100644 --- a/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java @@ -33,13 +33,13 @@ public class BrokerCompressionTypeTest { @Test public void testTargetCompressionType() { - GzipCompression gzipWithLevel = Compression.gzip().level(GzipCompression.MAX_LEVEL).build(); + GzipCompression gzipWithLevel = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build(); assertEquals(gzipWithLevel, BrokerCompressionType.targetCompression(Optional.of(gzipWithLevel), CompressionType.ZSTD)); SnappyCompression snappy = Compression.snappy().build(); assertEquals(snappy, BrokerCompressionType.targetCompression(Optional.of(snappy), CompressionType.LZ4)); - Lz4Compression lz4WithLevel = Compression.lz4().level(Lz4Compression.MAX_LEVEL).build(); + Lz4Compression lz4WithLevel = Compression.lz4().level(CompressionType.LZ4.maxLevel()).build(); assertEquals(lz4WithLevel, BrokerCompressionType.targetCompression(Optional.of(lz4WithLevel), CompressionType.ZSTD)); - ZstdCompression zstdWithLevel = Compression.zstd().level(ZstdCompression.MAX_LEVEL).build(); + ZstdCompression zstdWithLevel = Compression.zstd().level(CompressionType.ZSTD.maxLevel()).build(); assertEquals(zstdWithLevel, BrokerCompressionType.targetCompression(Optional.of(zstdWithLevel), CompressionType.GZIP)); GzipCompression gzip = Compression.gzip().build(); diff --git a/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index fc39e451481..883eaf19feb 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -17,11 +17,9 @@ package org.apache.kafka.server.config; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.compress.GzipCompression; -import org.apache.kafka.common.compress.Lz4Compression; -import org.apache.kafka.common.compress.ZstdCompression; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.storage.internals.log.LogConfig; @@ -30,7 +28,6 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; -import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.LONG; @@ -163,9 +160,9 @@ public class ServerConfigs { .define(CONTROLLED_SHUTDOWN_ENABLE_CONFIG, BOOLEAN, CONTROLLED_SHUTDOWN_ENABLE_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_ENABLE_DOC) .define(DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN, DELETE_TOPIC_ENABLE_DEFAULT, HIGH, DELETE_TOPIC_ENABLE_DOC) .define(COMPRESSION_TYPE_CONFIG, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, ConfigDef.ValidString.in(BrokerCompressionType.names().toArray(new String[0])), HIGH, COMPRESSION_TYPE_DOC) - .define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), MEDIUM, COMPRESSION_GZIP_LEVEL_DOC) - .define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC) - .define(COMPRESSION_ZSTD_LEVEL_CONFIG, INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC) + .define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, CompressionType.GZIP.defaultLevel(), CompressionType.GZIP.levelValidator(), MEDIUM, COMPRESSION_GZIP_LEVEL_DOC) + .define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC) + .define(COMPRESSION_ZSTD_LEVEL_CONFIG, INT, CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC) /** ********* Fetch Configuration **************/ .define(MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, INT, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DEFAULT, atLeast(0), MEDIUM, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DOC) .define(FETCH_MAX_BYTES_CONFIG, INT, FETCH_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM, FETCH_MAX_BYTES_DOC) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index a0ca52678ad..267b4f15eb0 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -17,9 +17,6 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.compress.Compression; -import org.apache.kafka.common.compress.GzipCompression; -import org.apache.kafka.common.compress.Lz4Compression; -import org.apache.kafka.common.compress.ZstdCompression; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.ConfigKey; @@ -28,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.ValidList; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.LegacyRecord; import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; @@ -300,12 +298,12 @@ public class LogConfig extends AbstractConfig { TopicConfig.MIN_IN_SYNC_REPLICAS_DOC) .define(TopicConfig.COMPRESSION_TYPE_CONFIG, STRING, DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names().toArray(new String[0])), MEDIUM, TopicConfig.COMPRESSION_TYPE_DOC) - .define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT, GzipCompression.DEFAULT_LEVEL, - new GzipCompression.LevelValidator(), MEDIUM, TopicConfig.COMPRESSION_GZIP_LEVEL_DOC) - .define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT, Lz4Compression.DEFAULT_LEVEL, - between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), MEDIUM, TopicConfig.COMPRESSION_LZ4_LEVEL_DOC) - .define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, ZstdCompression.DEFAULT_LEVEL, - between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC) + .define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT, CompressionType.GZIP.defaultLevel(), + CompressionType.GZIP.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_GZIP_LEVEL_DOC) + .define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT, CompressionType.LZ4.defaultLevel(), + CompressionType.LZ4.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_LZ4_LEVEL_DOC) + .define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, CompressionType.ZSTD.defaultLevel(), + CompressionType.ZSTD.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC) .define(TopicConfig.PREALLOCATE_CONFIG, BOOLEAN, DEFAULT_PREALLOCATE, MEDIUM, TopicConfig.PREALLOCATE_DOC) .define(MESSAGE_FORMAT_VERSION_CONFIG, STRING, DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM, MESSAGE_FORMAT_VERSION_DOC)