MINOR: Logs warning message when user invoke producer#flush within callback (#18112)

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
TengYao Chi 2024-12-10 23:27:42 +08:00 committed by GitHub
parent b99c22770a
commit f57fd2d9fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 8 additions and 0 deletions

View File

@ -1219,11 +1219,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)} * flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)}
* calls made since the previous {@link #beginTransaction()} are completed before the commit. * calls made since the previous {@link #beginTransaction()} are completed before the commit.
* </p> * </p>
* <p>
* <b>Important:</b> This method should not be used within the callback provided to
* {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code> in this context will cause a deadlock.
* </p>
* *
* @throws InterruptException If the thread is interrupted while blocked * @throws InterruptException If the thread is interrupted while blocked
*/ */
@Override @Override
public void flush() { public void flush() {
if (Thread.currentThread() == this.ioThread) {
log.error("KafkaProducer.flush() invocation inside a callback will cause a deadlock.");
}
log.trace("Flushing accumulated records in producer."); log.trace("Flushing accumulated records in producer.");
long start = time.nanoseconds(); long start = time.nanoseconds();