mirror of https://github.com/apache/kafka.git
KAFKA-19574: Improve producer and consumer config files (#20302)
This is an attempt at improving the client configuration files. We now have sections and comments similar to the other properties files. Reviewers: Kirk True <ktrue@confluent.io>, Luke Chen <showuon@gmail.com> --------- Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
This commit is contained in:
parent
342a8e6773
commit
2ba30cc466
|
@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.CsvSource;
|
import org.junit.jupiter.params.provider.CsvSource;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
@ -41,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
public class ConsumerConfigTest {
|
public class ConsumerConfigTest {
|
||||||
|
|
||||||
|
@ -256,4 +259,26 @@ public class ConsumerConfigTest {
|
||||||
assertEquals(configName + " cannot be set when " +
|
assertEquals(configName + " cannot be set when " +
|
||||||
ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name(), exception.getMessage());
|
ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name(), exception.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates config/consumer.properties file to avoid getting out of sync with ConsumerConfig.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateConfigPropertiesFile() {
|
||||||
|
Properties props = new Properties();
|
||||||
|
|
||||||
|
try (InputStream inputStream = new FileInputStream(System.getProperty("user.dir") + "/../config/consumer.properties")) {
|
||||||
|
props.load(inputStream);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Failed to load config/consumer.properties file: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
ConsumerConfig config = new ConsumerConfig(props);
|
||||||
|
|
||||||
|
for (String key : config.originals().keySet()) {
|
||||||
|
if (!ConsumerConfig.configDef().configKeys().containsKey(key)) {
|
||||||
|
fail("Invalid configuration key: " + key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,14 +26,18 @@ import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
public class ProducerConfigTest {
|
public class ProducerConfigTest {
|
||||||
|
|
||||||
|
@ -168,4 +172,26 @@ public class ProducerConfigTest {
|
||||||
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
|
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
|
||||||
assertDoesNotThrow(() -> new ProducerConfig(configs));
|
assertDoesNotThrow(() -> new ProducerConfig(configs));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates config/producer.properties file to avoid getting out of sync with ProducerConfig.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateConfigPropertiesFile() {
|
||||||
|
Properties props = new Properties();
|
||||||
|
|
||||||
|
try (InputStream inputStream = new FileInputStream(System.getProperty("user.dir") + "/../config/producer.properties")) {
|
||||||
|
props.load(inputStream);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Failed to load config/producer.properties file: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
ProducerConfig config = new ProducerConfig(props);
|
||||||
|
|
||||||
|
for (String key : config.originals().keySet()) {
|
||||||
|
if (!ProducerConfig.configDef().configKeys().containsKey(key)) {
|
||||||
|
fail("Invalid configuration key: " + key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,15 +12,127 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
|
|
||||||
|
|
||||||
# list of brokers used for bootstrapping knowledge about the rest of the cluster
|
# See org.apache.kafka.clients.consumer.ConsumerConfig for more details.
|
||||||
# format: host1:port1,host2:port2 ...
|
# Consider using environment variables or external configuration management
|
||||||
|
# for sensitive information like passwords and environment-specific settings.
|
||||||
|
|
||||||
|
##################### Consumer Basics #######################
|
||||||
|
|
||||||
|
# List of Kafka brokers used for initial cluster discovery and metadata retrieval.
|
||||||
|
# Format: host1:port1,host2:port2,host3:port3
|
||||||
|
# Include all brokers for high availability
|
||||||
bootstrap.servers=localhost:9092
|
bootstrap.servers=localhost:9092
|
||||||
|
|
||||||
# consumer group id
|
# Client identifier for logging and metrics.
|
||||||
|
# Helps with debugging and monitoring.
|
||||||
|
client.id=test-consumer
|
||||||
|
|
||||||
|
##################### Transaction Support #####################
|
||||||
|
|
||||||
|
# Isolation level for reading messages.
|
||||||
|
# Options: read_uncommitted (default), read_committed (for exactly-once semantics).
|
||||||
|
isolation.level=read_uncommitted
|
||||||
|
|
||||||
|
##################### Consumer Group Configuration #####################
|
||||||
|
|
||||||
|
# Unique identifier for this consumer group.
|
||||||
|
# All consumers with the same group.id will share partition consumption.
|
||||||
group.id=test-consumer-group
|
group.id=test-consumer-group
|
||||||
|
|
||||||
# What to do when there is no initial offset in Kafka or if the current
|
# What to do when there is no initial offset or if the current offset no longer exists.
|
||||||
# offset does not exist any more on the server: latest, earliest, none
|
# Options: earliest (from beginning), latest (from end), none (throw exception).
|
||||||
#auto.offset.reset=
|
# Use 'earliest' to avoid data loss on first run.
|
||||||
|
auto.offset.reset=earliest
|
||||||
|
|
||||||
|
##################### Partition Assignment Strategy #####################
|
||||||
|
|
||||||
|
# Strategy for assigning partitions to consumers in a group.
|
||||||
|
# Options: RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor.
|
||||||
|
# CooperativeStickyAssignor is recommended (requires Kafka 2.4+).
|
||||||
|
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
|
||||||
|
|
||||||
|
##################### Deserialization #####################
|
||||||
|
|
||||||
|
# Deserializer class for message keys.
|
||||||
|
# Common options: StringDeserializer, ByteArrayDeserializer, AvroDeserializer.
|
||||||
|
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
|
||||||
|
# Deserializer class for message values.
|
||||||
|
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
|
||||||
|
##################### Offset Management #####################
|
||||||
|
|
||||||
|
# Whether to automatically commit offsets in the background.
|
||||||
|
# Set to false for manual offset management and exactly-once processing.
|
||||||
|
enable.auto.commit=true
|
||||||
|
|
||||||
|
# Frequency (in milliseconds) at which offsets are auto-committed.
|
||||||
|
# Lower values provide better fault tolerance but increase broker load.
|
||||||
|
auto.commit.interval.ms=5000
|
||||||
|
|
||||||
|
##################### Classic Group Session Management #####################
|
||||||
|
|
||||||
|
# Timeout for detecting consumer failures when using group management.
|
||||||
|
# Must be between group.min.session.timeout.ms and group.max.session.timeout.ms (broker config).
|
||||||
|
session.timeout.ms=30000
|
||||||
|
|
||||||
|
# Expected time between heartbeats when using group management.
|
||||||
|
# Should be lower than session.timeout.ms (typically 1/3 of session timeout).
|
||||||
|
heartbeat.interval.ms=10000
|
||||||
|
|
||||||
|
# Maximum time between successive calls to poll().
|
||||||
|
# If exceeded, consumer is considered failed and partition rebalancing occurs.
|
||||||
|
max.poll.interval.ms=300000
|
||||||
|
|
||||||
|
##################### Retry And Error Handling #####################
|
||||||
|
|
||||||
|
# Initial and max time to wait for failed request retries.
|
||||||
|
# The retry.backoff.ms is the initial backoff value and will increase exponentially
|
||||||
|
# for each failed request, up to the retry.backoff.max.ms value.
|
||||||
|
retry.backoff.ms=100
|
||||||
|
retry.backoff.max.ms=1000
|
||||||
|
|
||||||
|
# Total time to wait for a response to a request.
|
||||||
|
request.timeout.ms=40000
|
||||||
|
|
||||||
|
# Close idle connections after this many milliseconds.
|
||||||
|
connections.max.idle.ms=540000
|
||||||
|
|
||||||
|
##################### Security Configuration #####################
|
||||||
|
|
||||||
|
# Security protocol for communication with brokers.
|
||||||
|
# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
|
||||||
|
#security.protocol=SASL_SSL
|
||||||
|
|
||||||
|
# SSL configuration.
|
||||||
|
#ssl.truststore.location=/path/to/truststore.jks
|
||||||
|
#ssl.truststore.password=truststore-password
|
||||||
|
|
||||||
|
# SASL configuration.
|
||||||
|
#sasl.mechanism=PLAIN
|
||||||
|
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
|
||||||
|
# username="your-username" \
|
||||||
|
# password="your-password";
|
||||||
|
|
||||||
|
##################### Performance And Throughput #####################
|
||||||
|
|
||||||
|
# Minimum data size (bytes) and maximum polling timeout (ms).
|
||||||
|
# Whichever condition is met first will trigger the fetch operation.
|
||||||
|
# Balances response latency against message batching efficiency.
|
||||||
|
# For remote partition fetching, configure remote.fetch.max.wait.ms instead.
|
||||||
|
fetch.min.bytes=1
|
||||||
|
fetch.max.wait.ms=500
|
||||||
|
|
||||||
|
# Set soft limits to the amount of bytes per fetch request and partition.
|
||||||
|
# Both max.partition.fetch.bytes and fetch.max.bytes limits can be exceeded when
|
||||||
|
# the first batch in the first non-empty partition is larger than the configured
|
||||||
|
# value to ensure that the consumer can make progress.
|
||||||
|
# Configuring message.max.bytes (broker config) or max.message.bytes (topic config)
|
||||||
|
# <= fetch.max.bytes prevents oversized fetch responses.
|
||||||
|
fetch.max.bytes=52428800
|
||||||
|
max.partition.fetch.bytes=1048576
|
||||||
|
|
||||||
|
# Maximum number of records returned in a single poll() call.
|
||||||
|
# Higher values increase throughput but may cause longer processing delays.
|
||||||
|
max.poll.records=500
|
||||||
|
|
|
@ -12,35 +12,127 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
# see org.apache.kafka.clients.producer.ProducerConfig for more details
|
|
||||||
|
|
||||||
############################# Producer Basics #############################
|
# See org.apache.kafka.clients.producer.ProducerConfig for more details.
|
||||||
|
# Consider using environment variables or external configuration management
|
||||||
|
# for sensitive information like passwords and environment-specific settings.
|
||||||
|
|
||||||
# list of brokers used for bootstrapping knowledge about the rest of the cluster
|
##################### Producer Basics #####################
|
||||||
# format: host1:port1,host2:port2 ...
|
|
||||||
|
# List of Kafka brokers used for initial cluster discovery and metadata retrieval.
|
||||||
|
# Format: host1:port1,host2:port2,host3:port3
|
||||||
|
# Include all brokers for high availability.
|
||||||
bootstrap.servers=localhost:9092
|
bootstrap.servers=localhost:9092
|
||||||
|
|
||||||
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
|
# Client identifier for logging and metrics.
|
||||||
|
# Helps with debugging and monitoring.
|
||||||
|
client.id=test-producer
|
||||||
|
|
||||||
|
##################### Transaction Support #####################
|
||||||
|
|
||||||
|
# Transactional ID for the producer.
|
||||||
|
# Must be unique across all producer instances.
|
||||||
|
# Enables exactly-once semantics across multiple partitions/topics.
|
||||||
|
#transactional.id=test-transactional-id
|
||||||
|
|
||||||
|
# Maximum amount of time in milliseconds that a transaction will remain open.
|
||||||
|
# Only applies when transactional.id is set.
|
||||||
|
transaction.timeout.ms=60000
|
||||||
|
|
||||||
|
##################### Partitioning #####################
|
||||||
|
|
||||||
|
# Name of the partitioner class for partitioning records.
|
||||||
|
# Default uses "sticky" partitioning which improves throughput by filling batches
|
||||||
|
# Options: DefaultPartitioner, RoundRobinPartitioner, UniformStickyPartitioner.
|
||||||
|
#partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
|
||||||
|
|
||||||
|
##################### Serialization #####################
|
||||||
|
|
||||||
|
# Serializer class for message keys.
|
||||||
|
# Common options: StringSerializer, ByteArraySerializer, AvroSerializer.
|
||||||
|
key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
|
||||||
|
# Serializer class for message values.
|
||||||
|
value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
|
||||||
|
##################### Reliability And Durability #####################
|
||||||
|
|
||||||
|
# Number of acknowledgments the producer requires the leader to have received.
|
||||||
|
# Options: 0 (no ack), 1 (leader only), all/-1 (all in-sync replicas).
|
||||||
|
# Use 'all' for maximum durability.
|
||||||
|
acks=all
|
||||||
|
|
||||||
|
# Number of retries for failed sends.
|
||||||
|
# Set to high value or Integer.MAX_VALUE for maximum reliability.
|
||||||
|
retries=2147483647
|
||||||
|
|
||||||
|
# Initial and max time to wait for failed request retries.
|
||||||
|
# The retry.backoff.ms is the initial backoff value and will increase exponentially
|
||||||
|
# for each failed request, up to the retry.backoff.max.ms value.
|
||||||
|
retry.backoff.ms=100
|
||||||
|
retry.backoff.max.ms=1000
|
||||||
|
|
||||||
|
# Enable idempotent producer to prevent duplicate messages.
|
||||||
|
# Ensures exactly-once delivery semantics when combined with proper consumer settings.
|
||||||
|
enable.idempotence=true
|
||||||
|
|
||||||
|
# Maximum number of unacknowledged requests the client will send on a single connection.
|
||||||
|
# Must be <= 5 when enable.idempotence=true to maintain ordering guarantees.
|
||||||
|
max.in.flight.requests.per.connection=5
|
||||||
|
|
||||||
|
##################### Timeouts And Blocking #####################
|
||||||
|
|
||||||
|
# Maximum amount of time the client will wait for the response of a request.
|
||||||
|
# Should be higher than replica.lag.time.max.ms (broker config).
|
||||||
|
request.timeout.ms=30000
|
||||||
|
|
||||||
|
# How long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.
|
||||||
|
# Should be higher than request.timeout.ms.
|
||||||
|
max.block.ms=60000
|
||||||
|
|
||||||
|
# Timeout for broker requests, including produce requests.
|
||||||
|
# Should be greater than or equal to the sum of request.timeout.ms and linger.ms.
|
||||||
|
delivery.timeout.ms=120000
|
||||||
|
|
||||||
|
##################### Security Configuration #####################
|
||||||
|
|
||||||
|
# Security protocol for communication with brokers.
|
||||||
|
# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
|
||||||
|
#security.protocol=SASL_SSL
|
||||||
|
|
||||||
|
# SSL configuration.
|
||||||
|
#ssl.truststore.location=/path/to/truststore.jks
|
||||||
|
#ssl.truststore.password=truststore-password
|
||||||
|
|
||||||
|
# SASL configuration.
|
||||||
|
#sasl.mechanism=PLAIN
|
||||||
|
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
|
||||||
|
# username="your-username" \
|
||||||
|
# password="your-password";
|
||||||
|
|
||||||
|
##################### Performance And Throughput #####################
|
||||||
|
|
||||||
|
# Compression codec for all data generated.
|
||||||
|
# Options: none, gzip, snappy, lz4, zstd.
|
||||||
|
# Can greatly improve throughput at the cost of increased CPU usage.
|
||||||
compression.type=none
|
compression.type=none
|
||||||
|
|
||||||
# name of the partitioner class for partitioning records;
|
# Producer will wait up to this delay to batch records together.
|
||||||
# The default uses "sticky" partitioning logic which spreads the load evenly between partitions, but improves throughput by attempting to fill the batches sent to each partition.
|
# Higher values increase throughput but add latency.
|
||||||
#partitioner.class=
|
# Set to 0 for lowest latency, 5-100ms for balanced throughput/latency.
|
||||||
|
linger.ms=5
|
||||||
|
|
||||||
# the maximum amount of time the client will wait for the response of a request
|
# Default batch size in bytes when batching multiple records sent to a partition.
|
||||||
#request.timeout.ms=
|
# Larger batches improve throughput but use more memory.
|
||||||
|
# 16KB is a good starting point, adjust based on message size and throughput needs.
|
||||||
|
batch.size=16384
|
||||||
|
|
||||||
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
|
# Total bytes of memory the producer can use to buffer records waiting to be sent.
|
||||||
#max.block.ms=
|
# Should be larger than batch.size * number of partitions you're writing to.
|
||||||
|
# 32MB is reasonable for most use cases.
|
||||||
|
buffer.memory=33554432
|
||||||
|
|
||||||
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
|
# Maximum size of a request in bytes.
|
||||||
#linger.ms=
|
# Should accommodate your largest batch size plus overhead.
|
||||||
|
# 1MB is default and suitable for most cases.
|
||||||
# the maximum size of a request in bytes
|
max.request.size=1048576
|
||||||
#max.request.size=
|
|
||||||
|
|
||||||
# the default batch size in bytes when batching multiple records sent to a partition
|
|
||||||
#batch.size=
|
|
||||||
|
|
||||||
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
|
|
||||||
#buffer.memory=
|
|
||||||
|
|
Loading…
Reference in New Issue