From 17846fe743aabedd2b556a0331731a5e899e1d65 Mon Sep 17 00:00:00 2001 From: Okada Haruki Date: Fri, 24 Jan 2025 21:23:43 +0900 Subject: [PATCH] KAFKA-16372 Fix producer doc discrepancy with the exception behavior (#15574) Currently, Producer.send doc is inconsistent with actual exception behavior - TimeoutException: This won't be thrown from send on buffer-full or metadata-missing actually. Instead, it will returned as failed future. - AuthenticationException/AuthorizationException: These exceptions are also won't be thrown. Returned with failed future actually. Fixed Callback javadoc and ProducerConfig doc as well. Reviewers: Luke Chen , Andrew Schofield --- .../java/org/apache/kafka/clients/producer/Callback.java | 3 +++ .../org/apache/kafka/clients/producer/KafkaProducer.java | 8 ++------ .../org/apache/kafka/clients/producer/ProducerConfig.java | 6 +++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java index 29acb88044b..2eb7ebc882b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java @@ -42,6 +42,8 @@ public interface Callback { *
  • {@link org.apache.kafka.common.errors.UnknownServerException UnknownServerException} *
  • {@link org.apache.kafka.common.errors.UnknownProducerIdException UnknownProducerIdException} *
  • {@link org.apache.kafka.common.errors.InvalidProducerEpochException InvalidProducerEpochException} + *
  • {@link org.apache.kafka.common.errors.AuthenticationException AuthenticationException} + *
  • {@link org.apache.kafka.common.errors.AuthorizationException AuthorizationException} * * Retriable exceptions (transient, may be covered by increasing #.retries): *
      @@ -52,6 +54,7 @@ public interface Callback { *
    • {@link org.apache.kafka.common.errors.OffsetOutOfRangeException OffsetOutOfRangeException} *
    • {@link org.apache.kafka.common.errors.TimeoutException TimeoutException} *
    • {@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException UnknownTopicOrPartitionException} + *
    • {@link org.apache.kafka.clients.producer.BufferExhaustedException BufferExhaustedException} *
    */ void onCompletion(RecordMetadata metadata, Exception exception); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 64930b36e27..130cfcb7bfb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -149,8 +149,8 @@ import java.util.concurrent.atomic.AtomicReference; *

    * The buffer.memory controls the total amount of memory available to the producer for buffering. If records * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is - * exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws - * a TimeoutException. + * exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it returns + * a failed future with BufferExhaustedException. *

    * The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with * their ProducerRecord into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or @@ -926,14 +926,10 @@ public class KafkaProducer implements Producer { * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) * - * @throws AuthenticationException if authentication fails. See the exception for more details - * @throws AuthorizationException fatal error indicating that the producer is not allowed to write * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or * when send is invoked after producer has been closed. * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers - * @throws TimeoutException If the record could not be appended to the send buffer due to memory unavailable - * or missing metadata within {@code max.block.ms}. * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. */ @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 23dd02bda98..295feee8240 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -209,7 +209,7 @@ public class ProducerConfig extends AbstractConfig { /** buffer.memory */ public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " - + "sent faster than they can be delivered to the server the producer will block for " + MAX_BLOCK_MS_CONFIG + " after which it will throw an exception." + + "sent faster than they can be delivered to the server the producer will block for " + MAX_BLOCK_MS_CONFIG + " after which it will fail with an exception." + "

    " + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since " + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if " @@ -309,8 +309,8 @@ public class ProducerConfig extends AbstractConfig { public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Determines which partition to send a record to when records are produced. Available options are:" + "

      " + - "
    • If not set, the default partitioning logic is used. " + - "This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" + + "
    • If not set, the default partitioning logic is used. " + + "This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" + "
        " + "
      1. If no partition is specified but a key is present, choose a partition based on a hash of the key.
      2. " + "
      3. If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.
      4. " +