KAFKA-17455: fix stuck producer when throttling or retrying (#17527)

A producer might get stuck after it was throttled. This PR unblocks the producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().

Reviewers: Matthias J. Sax <matthias@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Colt McNealy 2025-01-09 10:27:04 -08:00 committed by GitHub
parent 25fdcd05fc
commit bb22eec478
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 70 additions and 1 deletions

View File

@ -71,6 +71,17 @@ public final class NetworkClientUtils {
throw new IOException("Connection to " + node + " failed.");
}
long pollTimeout = timeoutMs - (attemptStartTime - startTime); // initialize in this order to avoid overflow
// If the network client is waiting to send data for some reason (eg. throttling or retry backoff),
// polling longer than that is potentially dangerous as the producer will not attempt to send
// any pending requests.
long waitingTime = client.pollDelayMs(node, startTime);
if (waitingTime > 0 && pollTimeout > waitingTime) {
// Block only until the next-scheduled time that it's okay to send data to the producer,
// wake up, and try again. This is the way.
pollTimeout = waitingTime;
}
client.poll(pollTimeout, attemptStartTime);
if (client.authenticationException(node) != null)
throw client.authenticationException(node);

View File

@ -71,6 +71,7 @@ public class MockClient implements KafkaClient {
private int correlation;
private Runnable wakeupHook;
private boolean advanceTimeDuringPoll;
private final Time time;
private final MockMetadataUpdater metadataUpdater;
private final Map<String, ConnectionState> connections = new HashMap<>();
@ -138,7 +139,11 @@ public class MockClient implements KafkaClient {
@Override
public long pollDelayMs(Node node, long now) {
return connectionDelay(node, now);
return connectionState(node.idString()).pollDelayMs(now);
}
public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
this.advanceTimeDuringPoll = advanceTimeDuringPoll;
}
public void backoff(Node node, long durationMs) {
@ -336,6 +341,12 @@ public class MockClient implements KafkaClient {
copy.add(response);
}
// In real life, if poll() is called and we get to the end with no responses,
// time equal to timeoutMs would have passed.
if (advanceTimeDuringPoll) {
time.sleep(timeoutMs);
}
return copy;
}
@ -795,6 +806,13 @@ public class MockClient implements KafkaClient {
return 0;
}
long pollDelayMs(long now) {
if (notThrottled(now))
return connectionDelay(now);
return throttledUntilMs - now;
}
boolean ready(long now) {
switch (state) {
case CONNECTED:

View File

@ -482,6 +482,46 @@ public class SenderTest {
assertTrue(future.isDone(), "Request should be completed");
}
@Test
public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
// We want MockClient#poll() to advance time so that eventually the backoff expires.
try {
client.advanceTimeDuringPoll(true);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
int throttleTimeMs = 1000;
long startTime = time.milliseconds();
Node nodeToThrottle = metadata.fetch().nodeById(0);
client.throttle(nodeToThrottle, throttleTimeMs);
// Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen
// as done above by throttling or with a disconnect / backoff.
long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
assertEquals(currentPollDelay, throttleTimeMs);
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
assertFalse(txnManager.hasInFlightRequest());
sender.runOnce();
assertTrue(txnManager.hasInFlightRequest());
long totalTimeToRunOnce = time.milliseconds() - startTime;
// It should have blocked roughly only the backoffTimeMs and some change.
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);
} finally {
client.advanceTimeDuringPoll(false);
}
}
@Test
public void testNodeLatencyStats() throws Exception {
try (Metrics m = new Metrics()) {