mirror of https://github.com/apache/kafka.git
KAFKA-19259: Async consumer fetch intermittent delays on console consumer (#19980)
There’s a difference in the two consumers’ `pollForFetches()` methods in this case: `ClassicKafkaConsumer` doesn't block waiting for data in the fetch buffer, but `AsyncKafkaConsumer` does. In `ClassicKafkaConsumer.pollForFetches()`, after enqueuing the `FETCH` request, the consumer makes a call to `ConsumerNetworkClient.poll()`. In most cases `poll()` returns almost immediately because it successfully sent the `FETCH` request. So even when the `pollTimeout` value is, e.g. 3000, the call to `ConsumerNetworkClient.poll()` doesn't block that long waiting for a response. After sending out a `FETCH` request, `AsyncKafkaConsumer` then calls `FetchBuffer.awaitNotEmpty()` and proceeds to block there for the full length of the timeout. In some cases, the response to the `FETCH` comes back with no results, which doesn't unblock `FetchBuffer.awaitNotEmpty()`. So because the application thread is still waiting for data in the buffer, it remains blocked, preventing any more `FETCH` requests from being sent, causing the long pauses in the console consumer. Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
b92d47d487
commit
f922ff6d1f
|
@ -63,6 +63,7 @@ import java.util.Set;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -109,6 +110,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
@ClusterTestDefaults(
|
||||
types = {Type.KRAFT},
|
||||
|
@ -1593,6 +1595,75 @@ public class PlaintextConsumerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testClassicConsumerStallBetweenPoll() throws Exception {
|
||||
testStallBetweenPoll(GroupProtocol.CLASSIC);
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testAsyncConsumerStallBetweenPoll() throws Exception {
|
||||
testStallBetweenPoll(GroupProtocol.CONSUMER);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test is to prove that the intermittent stalling that has been experienced when using the asynchronous
|
||||
* consumer, as filed under KAFKA-19259, have been fixed.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* The basic idea is to have one thread that produces a record every 500 ms. and the main thread that consumes
|
||||
* records without pausing between polls for much more than the produce delay. In the test case filed in
|
||||
* KAFKA-19259, the consumer sometimes pauses for up to 5-10 seconds despite records being produced every second.
|
||||
*/
|
||||
private void testStallBetweenPoll(GroupProtocol groupProtocol) throws Exception {
|
||||
var testTopic = "stall-test-topic";
|
||||
var numPartitions = 6;
|
||||
cluster.createTopic(testTopic, numPartitions, (short) BROKER_COUNT);
|
||||
|
||||
// The producer must produce slowly to tickle the scenario.
|
||||
var produceDelay = 500;
|
||||
|
||||
var executor = Executors.newScheduledThreadPool(1);
|
||||
|
||||
try (var producer = cluster.producer()) {
|
||||
// Start a thread running that produces records at a relative trickle.
|
||||
executor.scheduleWithFixedDelay(
|
||||
() -> producer.send(new ProducerRecord<>(testTopic, TestUtils.randomBytes(64))),
|
||||
0,
|
||||
produceDelay,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
|
||||
Map<String, Object> consumerConfig = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT));
|
||||
|
||||
// Assign a tolerance for how much time is allowed to pass between Consumer.poll() calls given that there
|
||||
// should be *at least* one record to read every second.
|
||||
var pollDelayTolerance = 2000;
|
||||
|
||||
try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) {
|
||||
consumer.subscribe(List.of(testTopic));
|
||||
|
||||
// This is here to allow the consumer time to settle the group membership/assignment.
|
||||
awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0));
|
||||
|
||||
// Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
|
||||
// exceed the delay threshold defined above.
|
||||
var beforePoll = System.currentTimeMillis();
|
||||
consumer.poll(Duration.ofSeconds(5));
|
||||
consumer.poll(Duration.ofSeconds(5));
|
||||
var afterPoll = System.currentTimeMillis();
|
||||
var pollDelay = afterPoll - beforePoll;
|
||||
|
||||
if (pollDelay > pollDelayTolerance)
|
||||
fail("Detected a stall of " + pollDelay + " ms between Consumer.poll() invocations despite a Producer producing records every " + produceDelay + " ms");
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
// Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
|
||||
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor did not terminate");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
|
||||
Consumer<byte[], byte[]> consumer,
|
||||
TopicPartition tp
|
||||
|
|
|
@ -147,6 +147,7 @@ public abstract class AbstractFetch implements Closeable {
|
|||
* @param data {@link FetchSessionHandler.FetchRequestData} that represents the session data
|
||||
* @param resp {@link ClientResponse} from which the {@link FetchResponse} will be retrieved
|
||||
*/
|
||||
@SuppressWarnings("NPathComplexity")
|
||||
protected void handleFetchSuccess(final Node fetchTarget,
|
||||
final FetchSessionHandler.FetchRequestData data,
|
||||
final ClientResponse resp) {
|
||||
|
@ -174,6 +175,8 @@ public abstract class AbstractFetch implements Closeable {
|
|||
final Set<TopicPartition> partitions = new HashSet<>(responseData.keySet());
|
||||
final FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions);
|
||||
|
||||
boolean needsWakeup = true;
|
||||
|
||||
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
|
||||
for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
|
||||
TopicPartition partition = entry.getKey();
|
||||
|
@ -220,8 +223,14 @@ public abstract class AbstractFetch implements Closeable {
|
|||
metricAggregator,
|
||||
fetchOffset);
|
||||
fetchBuffer.add(completedFetch);
|
||||
needsWakeup = false;
|
||||
}
|
||||
|
||||
// "Wake" the fetch buffer on any response, even if it's empty, to allow the consumer to not block
|
||||
// indefinitely waiting on the fetch buffer to get data.
|
||||
if (needsWakeup)
|
||||
fetchBuffer.wakeup();
|
||||
|
||||
if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {
|
||||
List<Node> leaderNodes = new ArrayList<>();
|
||||
|
||||
|
|
|
@ -1811,7 +1811,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// use of a shorter, dedicated "pollTimer" here which updates "timer" so that calling method (poll) will
|
||||
// correctly handle the overall timeout.
|
||||
try {
|
||||
fetchBuffer.awaitNotEmpty(pollTimer);
|
||||
fetchBuffer.awaitWakeup(pollTimer);
|
||||
} catch (InterruptException e) {
|
||||
log.trace("Interrupt during fetch", e);
|
||||
throw e;
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.slf4j.Logger;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -51,7 +52,7 @@ public class FetchBuffer implements AutoCloseable {
|
|||
private final Logger log;
|
||||
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
|
||||
private final Lock lock;
|
||||
private final Condition notEmptyCondition;
|
||||
private final Condition blockingCondition;
|
||||
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
|
||||
|
||||
private final AtomicBoolean wokenup = new AtomicBoolean(false);
|
||||
|
@ -62,7 +63,7 @@ public class FetchBuffer implements AutoCloseable {
|
|||
this.log = logContext.logger(FetchBuffer.class);
|
||||
this.completedFetches = new ConcurrentLinkedQueue<>();
|
||||
this.lock = new ReentrantLock();
|
||||
this.notEmptyCondition = lock.newCondition();
|
||||
this.blockingCondition = lock.newCondition();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,13 +96,7 @@ public class FetchBuffer implements AutoCloseable {
|
|||
}
|
||||
|
||||
void add(CompletedFetch completedFetch) {
|
||||
try {
|
||||
lock.lock();
|
||||
completedFetches.add(completedFetch);
|
||||
notEmptyCondition.signalAll();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
addAll(List.of(completedFetch));
|
||||
}
|
||||
|
||||
void addAll(Collection<CompletedFetch> completedFetches) {
|
||||
|
@ -111,7 +106,8 @@ public class FetchBuffer implements AutoCloseable {
|
|||
try {
|
||||
lock.lock();
|
||||
this.completedFetches.addAll(completedFetches);
|
||||
notEmptyCondition.signalAll();
|
||||
wokenup.set(true);
|
||||
blockingCondition.signalAll();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -154,23 +150,23 @@ public class FetchBuffer implements AutoCloseable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Allows the caller to await presence of data in the buffer. The method will block, returning only
|
||||
* Allows the caller to await a response from the broker for requested data. The method will block, returning only
|
||||
* under one of the following conditions:
|
||||
*
|
||||
* <ol>
|
||||
* <li>The buffer was already non-empty on entry</li>
|
||||
* <li>The buffer was populated during the wait</li>
|
||||
* <li>The buffer was already woken</li>
|
||||
* <li>The buffer was woken during the wait</li>
|
||||
* <li>The remaining time on the {@link Timer timer} elapsed</li>
|
||||
* <li>The thread was interrupted</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param timer Timer that provides time to wait
|
||||
*/
|
||||
void awaitNotEmpty(Timer timer) {
|
||||
void awaitWakeup(Timer timer) {
|
||||
try {
|
||||
lock.lock();
|
||||
|
||||
while (isEmpty() && !wokenup.compareAndSet(true, false)) {
|
||||
while (!wokenup.compareAndSet(true, false)) {
|
||||
// Update the timer before we head into the loop in case it took a while to get the lock.
|
||||
timer.update();
|
||||
|
||||
|
@ -185,7 +181,7 @@ public class FetchBuffer implements AutoCloseable {
|
|||
break;
|
||||
}
|
||||
|
||||
if (!notEmptyCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) {
|
||||
if (!blockingCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -198,10 +194,10 @@ public class FetchBuffer implements AutoCloseable {
|
|||
}
|
||||
|
||||
void wakeup() {
|
||||
wokenup.set(true);
|
||||
try {
|
||||
lock.lock();
|
||||
notEmptyCondition.signalAll();
|
||||
wokenup.set(true);
|
||||
blockingCondition.signalAll();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
|
|
@ -177,7 +177,7 @@ public class FetchBufferTest {
|
|||
try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
|
||||
final Thread waitingThread = new Thread(() -> {
|
||||
final Timer timer = time.timer(Duration.ofMinutes(1));
|
||||
fetchBuffer.awaitNotEmpty(timer);
|
||||
fetchBuffer.awaitWakeup(timer);
|
||||
});
|
||||
waitingThread.start();
|
||||
fetchBuffer.wakeup();
|
||||
|
|
Loading…
Reference in New Issue