KAFKA-10790: Add deadlock detection to producer#flush (#17946)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>, TaiJuWu <tjwu1217@gmail.com>
This commit is contained in:
TengYao Chi 2025-01-08 00:32:43 +08:00 committed by GitHub
parent af255a0c37
commit abeed20168
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 44 additions and 3 deletions

View File

@ -1181,16 +1181,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* calls made since the previous {@link #beginTransaction()} are completed before the commit.
* </p>
* <p>
* <b>Important:</b> This method should not be used within the callback provided to
* {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code> in this context will cause a deadlock.
* <b>Important:</b> This method must not be called from within the callback provided to
* {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code> in this context will result in a
* {@link KafkaException} being thrown, as it will cause a deadlock.
* </p>
*
* @throws InterruptException If the thread is interrupted while blocked
* @throws KafkaException If the method is invoked inside a {@link #send(ProducerRecord, Callback)} callback
*/
@Override
public void flush() {
if (Thread.currentThread() == this.ioThread) {
log.error("KafkaProducer.flush() invocation inside a callback will cause a deadlock.");
log.error("KafkaProducer.flush() invocation inside a callback is not permitted because it may lead to deadlock.");
throw new KafkaException("KafkaProducer.flush() invocation inside a callback is not permitted because it may lead to deadlock.");
}
log.trace("Flushing accumulated records in producer.");

View File

@ -2273,6 +2273,34 @@ public class KafkaProducerTest {
}
}
@Test
public void shouldNotInvokeFlushInCallback() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
// only test in idempotence disabled producer for simplicity
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);
AtomicReference<KafkaException> kafkaException = new AtomicReference<>();
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.send(
new ProducerRecord<>("topic", "value"),
(recordMetadata, exception) -> kafkaException.set(assertThrows(KafkaException.class, producer::flush))
);
}
assertNotNull(kafkaException.get());
assertEquals("KafkaProducer.flush() invocation inside a callback is not permitted because it may lead to deadlock.",
kafkaException.get().getMessage());
}
@Test
public void negativePartitionShouldThrow() {
Map<String, Object> configs = new HashMap<>();

View File

@ -19,6 +19,16 @@
<script id="upgrade-template" type="text/x-handlebars-template">
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0 from any version 0.8.x through 4.0.x</a></h4>
<h5><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4.1.0</a></h5>
<ul>
<li><b>Producer</b>
<ul>
<li>The <code>flush</code> method now detects potential deadlocks and prohibits its use inside a callback. This change prevents unintended blocking behavior, which was a known risk in earlier versions.
</li>
</ul>
</li>
</ul>
<h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0 from any version 0.8.x through 3.9.x</a></h4>
<h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4.0.0</a></h5>
<ul>