MINOR: Improve producer docs and add tests around timeout behaviour on missing topic/partition (#20533)

Clarify timeout errors received on send if the case is topic not in
metadata vs partition not in metadata.  Add integration tests showcases
the difference  Follow-up from 4.1 fix for misleading timeout error
message (https://issues.apache.org/jira/browse/KAFKA-8862)

Reviewers: TengYao Chi <frankvicky@apache.org>, Kuan-Po Tseng
 <brandboat@gmail.com>
This commit is contained in:
Lianet Magrans 2025-09-15 13:28:27 -04:00 committed by GitHub
parent 3fcc0c2877
commit caeca090b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 78 additions and 16 deletions

View File

@ -1027,15 +1027,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
*
* @param record The record to send
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
* when send is invoked after producer has been closed.
* @throws InterruptException If the thread is interrupted while blocked
* indicates no callback)
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
* when send is invoked after producer has been closed.
* @throws TimeoutException if the topic or the partition specified in the record cannot be found in metadata within {@code max.block.ms}
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
@ -1335,11 +1335,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
/**
* Get the partition metadata for the given topic. This can be used for custom partitioning.
* <p/>
* This will attempt to refresh metadata until it finds the topic in it, or the configured {@link ProducerConfig#MAX_BLOCK_MS_CONFIG} expires.
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException if not authorized to the specified topic. See the exception for more details
* @throws InterruptException if the thread is interrupted while blocked
* @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
* @throws AuthorizationException if not authorized to the specified topic. See the exception for more details
* @throws InterruptException if the thread is interrupted while blocked
* @throws TimeoutException if the topic cannot be found in metadata within {@code max.block.ms}
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {

View File

@ -48,16 +48,20 @@ import scala.jdk.javaapi.OptionConverters
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def generateConfigs: scala.collection.Seq[KafkaConfig] = {
val overridingProps = new Properties()
val numServers = 2
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toShort)
overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
TestUtils.createBrokerConfigs(
numServers,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile,
saslProperties = serverSaslProperties
).map(KafkaConfig.fromProps(_, overridingProps))
).map(KafkaConfig.fromProps(_, brokerOverrides))
}
protected def brokerOverrides: Properties = {
val overridingProps = new Properties()
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toShort)
overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
overridingProps
}
private var consumer: Consumer[Array[Byte], Array[Byte]] = _

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Record
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@ -37,6 +37,22 @@ import java.nio.charset.StandardCharsets
class PlaintextProducerSendTest extends BaseProducerSendTest {
// topic auto creation is enabled by default, only some tests disable it
var disableAutoTopicCreation = false
override def brokerOverrides: Properties = {
val props = super.brokerOverrides
if (disableAutoTopicCreation) {
props.put("auto.create.topics.enable", "false")
}
props
}
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
disableAutoTopicCreation = testInfo.getDisplayName.contains("autoCreateTopicsEnabled=false")
super.setUp(testInfo)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testWrongSerializer(groupProtocol: String): Unit = {
@ -121,6 +137,39 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
}
}
/**
* Test error message received when send fails waiting on metadata for a topic that does not exist.
* No need to run this for both rebalance protocols.
*/
@ParameterizedTest(name = "groupProtocol={0}.autoCreateTopicsEnabled={1}")
@MethodSource(Array("protocolAndAutoCreateTopicProviders"))
def testSendTimeoutErrorMessageWhenTopicDoesNotExist(groupProtocol: String, autoCreateTopicsEnabled: String): Unit = {
val producer = createProducer(maxBlockMs = 500)
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
val exception = assertThrows(classOf[ExecutionException], () => producer.send(record).get)
assertInstanceOf(classOf[TimeoutException], exception.getCause)
assertEquals("Topic topic not present in metadata after 500 ms.", exception.getCause.getMessage)
}
/**
* Test error message received when send fails waiting on metadata for a partition that does not exist (topic exists).
* No need to run this for both rebalance protocols.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersClassicGroupProtocolOnly"))
def testSendTimeoutErrorWhenPartitionDoesNotExist(groupProtocol: String): Unit = {
val producer = createProducer(maxBlockMs = 500)
// Send a message to auto-create the topic
var record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
// Send another message to the topic that exists but to a partition that does not
record = new ProducerRecord(topic, 10, "key".getBytes, "value".getBytes)
val exception = assertThrows(classOf[ExecutionException], () => producer.send(record).get)
assertInstanceOf(classOf[TimeoutException], exception.getCause)
assertEquals("Partition 10 of topic topic with partition count 4 is not present in metadata after 500 ms.", exception.getCause.getMessage)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("timestampConfigProvider"))
def testSendWithInvalidBeforeAndAfterTimestamp(groupProtocol: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
@ -285,4 +334,10 @@ object PlaintextProducerSendTest {
}
data.stream()
}
def protocolAndAutoCreateTopicProviders: java.util.stream.Stream[Arguments] = {
val data = new java.util.ArrayList[Arguments]()
data.add(Arguments.of("classic", "false"))
data.stream()
}
}