diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java index 2fa5515fb40..0e50d5d4698 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java @@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import java.io.FileInputStream; +import java.io.InputStream; import java.util.Arrays; import java.util.HashMap; import java.util.Locale; @@ -41,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class ConsumerConfigTest { @@ -256,4 +259,26 @@ public class ConsumerConfigTest { assertEquals(configName + " cannot be set when " + ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name(), exception.getMessage()); } + + /** + * Validates config/consumer.properties file to avoid getting out of sync with ConsumerConfig. + */ + @Test + public void testValidateConfigPropertiesFile() { + Properties props = new Properties(); + + try (InputStream inputStream = new FileInputStream(System.getProperty("user.dir") + "/../config/consumer.properties")) { + props.load(inputStream); + } catch (Exception e) { + fail("Failed to load config/consumer.properties file: " + e.getMessage()); + } + + ConsumerConfig config = new ConsumerConfig(props); + + for (String key : config.originals().keySet()) { + if (!ConsumerConfig.configDef().configKeys().containsKey(key)) { + fail("Invalid configuration key: " + key); + } + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java index 207bac6476f..33f069c8523 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java @@ -26,14 +26,18 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; +import java.io.FileInputStream; +import java.io.InputStream; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class ProducerConfigTest { @@ -168,4 +172,26 @@ public class ProducerConfigTest { configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false); assertDoesNotThrow(() -> new ProducerConfig(configs)); } + + /** + * Validates config/producer.properties file to avoid getting out of sync with ProducerConfig. + */ + @Test + public void testValidateConfigPropertiesFile() { + Properties props = new Properties(); + + try (InputStream inputStream = new FileInputStream(System.getProperty("user.dir") + "/../config/producer.properties")) { + props.load(inputStream); + } catch (Exception e) { + fail("Failed to load config/producer.properties file: " + e.getMessage()); + } + + ProducerConfig config = new ProducerConfig(props); + + for (String key : config.originals().keySet()) { + if (!ProducerConfig.configDef().configKeys().containsKey(key)) { + fail("Invalid configuration key: " + key); + } + } + } } diff --git a/config/consumer.properties b/config/consumer.properties index 01bb12eb089..f65e5299041 100644 --- a/config/consumer.properties +++ b/config/consumer.properties @@ -4,23 +4,135 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# see org.apache.kafka.clients.consumer.ConsumerConfig for more details -# list of brokers used for bootstrapping knowledge about the rest of the cluster -# format: host1:port1,host2:port2 ... +# See org.apache.kafka.clients.consumer.ConsumerConfig for more details. +# Consider using environment variables or external configuration management +# for sensitive information like passwords and environment-specific settings. + +##################### Consumer Basics ####################### + +# List of Kafka brokers used for initial cluster discovery and metadata retrieval. +# Format: host1:port1,host2:port2,host3:port3 +# Include all brokers for high availability bootstrap.servers=localhost:9092 -# consumer group id +# Client identifier for logging and metrics. +# Helps with debugging and monitoring. +client.id=test-consumer + +##################### Transaction Support ##################### + +# Isolation level for reading messages. +# Options: read_uncommitted (default), read_committed (for exactly-once semantics). +isolation.level=read_uncommitted + +##################### Consumer Group Configuration ##################### + +# Unique identifier for this consumer group. +# All consumers with the same group.id will share partition consumption. group.id=test-consumer-group -# What to do when there is no initial offset in Kafka or if the current -# offset does not exist any more on the server: latest, earliest, none -#auto.offset.reset= +# What to do when there is no initial offset or if the current offset no longer exists. +# Options: earliest (from beginning), latest (from end), none (throw exception). +# Use 'earliest' to avoid data loss on first run. +auto.offset.reset=earliest + +##################### Partition Assignment Strategy ##################### + +# Strategy for assigning partitions to consumers in a group. +# Options: RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor. +# CooperativeStickyAssignor is recommended (requires Kafka 2.4+). +partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor + +##################### Deserialization ##################### + +# Deserializer class for message keys. +# Common options: StringDeserializer, ByteArrayDeserializer, AvroDeserializer. +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +# Deserializer class for message values. +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +##################### Offset Management ##################### + +# Whether to automatically commit offsets in the background. +# Set to false for manual offset management and exactly-once processing. +enable.auto.commit=true + +# Frequency (in milliseconds) at which offsets are auto-committed. +# Lower values provide better fault tolerance but increase broker load. +auto.commit.interval.ms=5000 + +##################### Classic Group Session Management ##################### + +# Timeout for detecting consumer failures when using group management. +# Must be between group.min.session.timeout.ms and group.max.session.timeout.ms (broker config). +session.timeout.ms=30000 + +# Expected time between heartbeats when using group management. +# Should be lower than session.timeout.ms (typically 1/3 of session timeout). +heartbeat.interval.ms=10000 + +# Maximum time between successive calls to poll(). +# If exceeded, consumer is considered failed and partition rebalancing occurs. +max.poll.interval.ms=300000 + +##################### Retry And Error Handling ##################### + +# Initial and max time to wait for failed request retries. +# The retry.backoff.ms is the initial backoff value and will increase exponentially +# for each failed request, up to the retry.backoff.max.ms value. +retry.backoff.ms=100 +retry.backoff.max.ms=1000 + +# Total time to wait for a response to a request. +request.timeout.ms=40000 + +# Close idle connections after this many milliseconds. +connections.max.idle.ms=540000 + +##################### Security Configuration ##################### + +# Security protocol for communication with brokers. +# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL +#security.protocol=SASL_SSL + +# SSL configuration. +#ssl.truststore.location=/path/to/truststore.jks +#ssl.truststore.password=truststore-password + +# SASL configuration. +#sasl.mechanism=PLAIN +#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ +# username="your-username" \ +# password="your-password"; + +##################### Performance And Throughput ##################### + +# Minimum data size (bytes) and maximum polling timeout (ms). +# Whichever condition is met first will trigger the fetch operation. +# Balances response latency against message batching efficiency. +# For remote partition fetching, configure remote.fetch.max.wait.ms instead. +fetch.min.bytes=1 +fetch.max.wait.ms=500 + +# Set soft limits to the amount of bytes per fetch request and partition. +# Both max.partition.fetch.bytes and fetch.max.bytes limits can be exceeded when +# the first batch in the first non-empty partition is larger than the configured +# value to ensure that the consumer can make progress. +# Configuring message.max.bytes (broker config) or max.message.bytes (topic config) +# <= fetch.max.bytes prevents oversized fetch responses. +fetch.max.bytes=52428800 +max.partition.fetch.bytes=1048576 + +# Maximum number of records returned in a single poll() call. +# Higher values increase throughput but may cause longer processing delays. +max.poll.records=500 diff --git a/config/producer.properties b/config/producer.properties index 3a999e7c17e..6165ce9ff57 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -12,35 +12,127 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# see org.apache.kafka.clients.producer.ProducerConfig for more details -############################# Producer Basics ############################# +# See org.apache.kafka.clients.producer.ProducerConfig for more details. +# Consider using environment variables or external configuration management +# for sensitive information like passwords and environment-specific settings. -# list of brokers used for bootstrapping knowledge about the rest of the cluster -# format: host1:port1,host2:port2 ... +##################### Producer Basics ##################### + +# List of Kafka brokers used for initial cluster discovery and metadata retrieval. +# Format: host1:port1,host2:port2,host3:port3 +# Include all brokers for high availability. bootstrap.servers=localhost:9092 -# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd +# Client identifier for logging and metrics. +# Helps with debugging and monitoring. +client.id=test-producer + +##################### Transaction Support ##################### + +# Transactional ID for the producer. +# Must be unique across all producer instances. +# Enables exactly-once semantics across multiple partitions/topics. +#transactional.id=test-transactional-id + +# Maximum amount of time in milliseconds that a transaction will remain open. +# Only applies when transactional.id is set. +transaction.timeout.ms=60000 + +##################### Partitioning ##################### + +# Name of the partitioner class for partitioning records. +# Default uses "sticky" partitioning which improves throughput by filling batches +# Options: DefaultPartitioner, RoundRobinPartitioner, UniformStickyPartitioner. +#partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner + +##################### Serialization ##################### + +# Serializer class for message keys. +# Common options: StringSerializer, ByteArraySerializer, AvroSerializer. +key.serializer=org.apache.kafka.common.serialization.StringSerializer + +# Serializer class for message values. +value.serializer=org.apache.kafka.common.serialization.StringSerializer + +##################### Reliability And Durability ##################### + +# Number of acknowledgments the producer requires the leader to have received. +# Options: 0 (no ack), 1 (leader only), all/-1 (all in-sync replicas). +# Use 'all' for maximum durability. +acks=all + +# Number of retries for failed sends. +# Set to high value or Integer.MAX_VALUE for maximum reliability. +retries=2147483647 + +# Initial and max time to wait for failed request retries. +# The retry.backoff.ms is the initial backoff value and will increase exponentially +# for each failed request, up to the retry.backoff.max.ms value. +retry.backoff.ms=100 +retry.backoff.max.ms=1000 + +# Enable idempotent producer to prevent duplicate messages. +# Ensures exactly-once delivery semantics when combined with proper consumer settings. +enable.idempotence=true + +# Maximum number of unacknowledged requests the client will send on a single connection. +# Must be <= 5 when enable.idempotence=true to maintain ordering guarantees. +max.in.flight.requests.per.connection=5 + +##################### Timeouts And Blocking ##################### + +# Maximum amount of time the client will wait for the response of a request. +# Should be higher than replica.lag.time.max.ms (broker config). +request.timeout.ms=30000 + +# How long KafkaProducer.send() and KafkaProducer.partitionsFor() will block. +# Should be higher than request.timeout.ms. +max.block.ms=60000 + +# Timeout for broker requests, including produce requests. +# Should be greater than or equal to the sum of request.timeout.ms and linger.ms. +delivery.timeout.ms=120000 + +##################### Security Configuration ##################### + +# Security protocol for communication with brokers. +# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL +#security.protocol=SASL_SSL + +# SSL configuration. +#ssl.truststore.location=/path/to/truststore.jks +#ssl.truststore.password=truststore-password + +# SASL configuration. +#sasl.mechanism=PLAIN +#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ +# username="your-username" \ +# password="your-password"; + +##################### Performance And Throughput ##################### + +# Compression codec for all data generated. +# Options: none, gzip, snappy, lz4, zstd. +# Can greatly improve throughput at the cost of increased CPU usage. compression.type=none -# name of the partitioner class for partitioning records; -# The default uses "sticky" partitioning logic which spreads the load evenly between partitions, but improves throughput by attempting to fill the batches sent to each partition. -#partitioner.class= +# Producer will wait up to this delay to batch records together. +# Higher values increase throughput but add latency. +# Set to 0 for lowest latency, 5-100ms for balanced throughput/latency. +linger.ms=5 -# the maximum amount of time the client will wait for the response of a request -#request.timeout.ms= +# Default batch size in bytes when batching multiple records sent to a partition. +# Larger batches improve throughput but use more memory. +# 16KB is a good starting point, adjust based on message size and throughput needs. +batch.size=16384 -# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for -#max.block.ms= +# Total bytes of memory the producer can use to buffer records waiting to be sent. +# Should be larger than batch.size * number of partitions you're writing to. +# 32MB is reasonable for most use cases. +buffer.memory=33554432 -# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together -#linger.ms= - -# the maximum size of a request in bytes -#max.request.size= - -# the default batch size in bytes when batching multiple records sent to a partition -#batch.size= - -# the total bytes of memory the producer can use to buffer records waiting to be sent to the server -#buffer.memory= +# Maximum size of a request in bytes. +# Should accommodate your largest batch size plus overhead. +# 1MB is default and suitable for most cases. +max.request.size=1048576