From caeca090b860a200d75e0b9c7492f67259ec650a Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Mon, 15 Sep 2025 13:28:27 -0400 Subject: [PATCH] MINOR: Improve producer docs and add tests around timeout behaviour on missing topic/partition (#20533) Clarify timeout errors received on send if the case is topic not in metadata vs partition not in metadata. Add integration tests showcases the difference Follow-up from 4.1 fix for misleading timeout error message (https://issues.apache.org/jira/browse/KAFKA-8862) Reviewers: TengYao Chi , Kuan-Po Tseng --- .../kafka/clients/producer/KafkaProducer.java | 25 ++++---- .../kafka/api/BaseProducerSendTest.scala | 12 ++-- .../kafka/api/PlaintextProducerSendTest.scala | 57 ++++++++++++++++++- 3 files changed, 78 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 48be57f262b..6e656f590e4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1027,15 +1027,15 @@ public class KafkaProducer implements Producer { * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body * to parallelize processing. * - * @param record The record to send + * @param record The record to send * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null - * indicates no callback) - * - * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or - * when send is invoked after producer has been closed. - * @throws InterruptException If the thread is interrupted while blocked + * indicates no callback) + * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or + * when send is invoked after producer has been closed. + * @throws TimeoutException if the topic or the partition specified in the record cannot be found in metadata within {@code max.block.ms} + * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers - * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. + * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. */ @Override public Future send(ProducerRecord record, Callback callback) { @@ -1335,11 +1335,14 @@ public class KafkaProducer implements Producer { /** * Get the partition metadata for the given topic. This can be used for custom partitioning. + *

+ * This will attempt to refresh metadata until it finds the topic in it, or the configured {@link ProducerConfig#MAX_BLOCK_MS_CONFIG} expires. + * * @throws AuthenticationException if authentication fails. See the exception for more details - * @throws AuthorizationException if not authorized to the specified topic. See the exception for more details - * @throws InterruptException if the thread is interrupted while blocked - * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms} - * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close + * @throws AuthorizationException if not authorized to the specified topic. See the exception for more details + * @throws InterruptException if the thread is interrupted while blocked + * @throws TimeoutException if the topic cannot be found in metadata within {@code max.block.ms} + * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close */ @Override public List partitionsFor(String topic) { diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 98155438549..add18b260cd 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -48,16 +48,20 @@ import scala.jdk.javaapi.OptionConverters abstract class BaseProducerSendTest extends KafkaServerTestHarness { def generateConfigs: scala.collection.Seq[KafkaConfig] = { - val overridingProps = new Properties() val numServers = 2 - overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toShort) - overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString) TestUtils.createBrokerConfigs( numServers, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties - ).map(KafkaConfig.fromProps(_, overridingProps)) + ).map(KafkaConfig.fromProps(_, brokerOverrides)) + } + + protected def brokerOverrides: Properties = { + val overridingProps = new Properties() + overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toShort) + overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString) + overridingProps } private var consumer: Consumer[Array[Byte], Array[Byte]] = _ diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 6e3bbf4aed7..dc8b9423304 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Record import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.server.config.ServerLogConfigs import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} @@ -37,6 +37,22 @@ import java.nio.charset.StandardCharsets class PlaintextProducerSendTest extends BaseProducerSendTest { + // topic auto creation is enabled by default, only some tests disable it + var disableAutoTopicCreation = false + + override def brokerOverrides: Properties = { + val props = super.brokerOverrides + if (disableAutoTopicCreation) { + props.put("auto.create.topics.enable", "false") + } + props + } + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { + disableAutoTopicCreation = testInfo.getDisplayName.contains("autoCreateTopicsEnabled=false") + super.setUp(testInfo) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersAll")) def testWrongSerializer(groupProtocol: String): Unit = { @@ -121,6 +137,39 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { } } + /** + * Test error message received when send fails waiting on metadata for a topic that does not exist. + * No need to run this for both rebalance protocols. + */ + @ParameterizedTest(name = "groupProtocol={0}.autoCreateTopicsEnabled={1}") + @MethodSource(Array("protocolAndAutoCreateTopicProviders")) + def testSendTimeoutErrorMessageWhenTopicDoesNotExist(groupProtocol: String, autoCreateTopicsEnabled: String): Unit = { + val producer = createProducer(maxBlockMs = 500) + val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + val exception = assertThrows(classOf[ExecutionException], () => producer.send(record).get) + assertInstanceOf(classOf[TimeoutException], exception.getCause) + assertEquals("Topic topic not present in metadata after 500 ms.", exception.getCause.getMessage) + } + + /** + * Test error message received when send fails waiting on metadata for a partition that does not exist (topic exists). + * No need to run this for both rebalance protocols. + */ + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersClassicGroupProtocolOnly")) + def testSendTimeoutErrorWhenPartitionDoesNotExist(groupProtocol: String): Unit = { + val producer = createProducer(maxBlockMs = 500) + // Send a message to auto-create the topic + var record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + assertEquals(0L, producer.send(record).get.offset, "Should have offset 0") + + // Send another message to the topic that exists but to a partition that does not + record = new ProducerRecord(topic, 10, "key".getBytes, "value".getBytes) + val exception = assertThrows(classOf[ExecutionException], () => producer.send(record).get) + assertInstanceOf(classOf[TimeoutException], exception.getCause) + assertEquals("Partition 10 of topic topic with partition count 4 is not present in metadata after 500 ms.", exception.getCause.getMessage) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("timestampConfigProvider")) def testSendWithInvalidBeforeAndAfterTimestamp(groupProtocol: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = { @@ -285,4 +334,10 @@ object PlaintextProducerSendTest { } data.stream() } + + def protocolAndAutoCreateTopicProviders: java.util.stream.Stream[Arguments] = { + val data = new java.util.ArrayList[Arguments]() + data.add(Arguments.of("classic", "false")) + data.stream() + } } \ No newline at end of file