MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)

Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
RivenSun 2022-04-13 12:24:57 +08:00 committed by GitHub
parent 0d518aaed1
commit 1df232c839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 5 deletions

View File

@ -26,7 +26,9 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -329,7 +331,7 @@ public class ProducerConfig extends AbstractConfig {
in("all", "-1", "0", "1"),
Importance.LOW,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)

View File

@ -190,4 +190,10 @@ public enum CompressionType {
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}
@Override
public String toString() {
return name;
}
}

View File

@ -19,12 +19,14 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ProducerConfigTest {
@ -59,4 +61,13 @@ public class ProducerConfigTest {
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass);
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
}
@Test
public void testInvalidCompressionType() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "abc");
assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
}
}

View File

@ -25,7 +25,7 @@ import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.log.LogConfig
import kafka.log.LogConfig.MessageFormatVersion
import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
@ -227,7 +227,7 @@ object Defaults {
val DeleteTopicEnable = true
val CompressionType = "producer"
val CompressionType = ProducerCompressionCodec.name
val MaxIdMapSnapshots = 2
/** ********* Kafka Metrics Configuration ***********/
@ -1257,7 +1257,7 @@ object KafkaConfig {
.define(OffsetCommitTimeoutMsProp, INT, Defaults.OffsetCommitTimeoutMs, atLeast(1), HIGH, OffsetCommitTimeoutMsDoc)
.define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc)
.define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc)
.define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc)
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), HIGH, CompressionTypeDoc)
/** ********* Transaction management configuration ***********/
.define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMs, atLeast(1), HIGH, TransactionalIdExpirationMsDoc)

View File

@ -657,7 +657,7 @@ class KafkaConfigTest {
def testInvalidCompressionType(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.CompressionTypeProp, "abc")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test