mirror of https://github.com/apache/kafka.git
KAFKA-8862: Improve Producer error message for failed metadata update (#18587)
We should provide the same informative error message for both timeout cases. Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
c6d452b635
commit
6612dd5c0b
|
@ -1104,8 +1104,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
metadata.awaitUpdate(version, remainingWaitMs);
|
metadata.awaitUpdate(version, remainingWaitMs);
|
||||||
} catch (TimeoutException ex) {
|
} catch (TimeoutException ex) {
|
||||||
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
|
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
|
||||||
final String errorMessage = String.format("Topic %s not present in metadata after %d ms.",
|
final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs);
|
||||||
topic, maxWaitMs);
|
|
||||||
if (metadata.getError(topic) != null) {
|
if (metadata.getError(topic) != null) {
|
||||||
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
|
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
|
||||||
}
|
}
|
||||||
|
@ -1114,11 +1113,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
cluster = metadata.fetch();
|
cluster = metadata.fetch();
|
||||||
elapsed = time.milliseconds() - nowMs;
|
elapsed = time.milliseconds() - nowMs;
|
||||||
if (elapsed >= maxWaitMs) {
|
if (elapsed >= maxWaitMs) {
|
||||||
final String errorMessage = partitionsCount == null ?
|
final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs);
|
||||||
String.format("Topic %s not present in metadata after %d ms.",
|
|
||||||
topic, maxWaitMs) :
|
|
||||||
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
|
|
||||||
partition, topic, partitionsCount, maxWaitMs);
|
|
||||||
if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) {
|
if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) {
|
||||||
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
|
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
|
||||||
}
|
}
|
||||||
|
@ -1134,6 +1129,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
return new ClusterAndWaitTime(cluster, elapsed);
|
return new ClusterAndWaitTime(cluster, elapsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getErrorMessage(Integer partitionsCount, String topic, Integer partition, long maxWaitMs) {
|
||||||
|
return partitionsCount == null ?
|
||||||
|
String.format("Topic %s not present in metadata after %d ms.",
|
||||||
|
topic, maxWaitMs) :
|
||||||
|
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
|
||||||
|
partition, topic, partitionsCount, maxWaitMs);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Validate that the record size isn't too large
|
* Validate that the record size isn't too large
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue