KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes (#10620)

When flush is called a copy of incomplete batches is made. This
means that the full ProducerBatch(s) are held in memory until the flush
has completed. Note that the `Sender` removes producer batches
from the original incomplete collection when they're no longer
needed.

For batches where the existing memory pool is used this
is not as wasteful as the memory will be returned to the pool,
but for non pool memory it can only be GC'd after the flush has
completed. Rather than use copyAll we can make a new array with only the
produceFuture(s) and await on those.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Lucas Bradstreet 2021-05-17 06:34:32 -07:00 committed by GitHub
parent e35f5c88b1
commit fe16912dfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 2 deletions

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer.internals;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
/* /*
* A thread-safe helper class to hold batches that haven't been acknowledged yet (including those * A thread-safe helper class to hold batches that haven't been acknowledged yet (including those
@ -51,6 +52,12 @@ class IncompleteBatches {
} }
} }
public Iterable<ProduceRequestResult> requestResults() {
synchronized (incomplete) {
return incomplete.stream().map(batch -> batch.produceFuture).collect(Collectors.toList());
}
}
public boolean isEmpty() { public boolean isEmpty() {
synchronized (incomplete) { synchronized (incomplete) {
return incomplete.isEmpty(); return incomplete.isEmpty();

View File

@ -710,8 +710,12 @@ public final class RecordAccumulator {
*/ */
public void awaitFlushCompletion() throws InterruptedException { public void awaitFlushCompletion() throws InterruptedException {
try { try {
for (ProducerBatch batch : this.incomplete.copyAll()) // Obtain a copy of all of the incomplete ProduceRequestResult(s) at the time of the flush.
batch.produceFuture.await(); // We must be careful not to hold a reference to the ProduceBatch(s) so that garbage
// collection can occur on the contents.
// The sender will remove ProducerBatch(s) from the original incomplete collection.
for (ProduceRequestResult result : this.incomplete.requestResults())
result.await();
} finally { } finally {
this.flushesInProgress.decrementAndGet(); this.flushesInProgress.decrementAndGet();
} }