KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190)

In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be retried without proper backoff, due to the expired timer. This is an uncommon scenario and possibly only appears during the testing, but I think it makes sense to enforce the client to drive the join group via poll.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Philip Nee 2023-02-28 17:36:37 -08:00 committed by GitHub
parent 6b89672b5e
commit f7f376f6c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 4 deletions

View File

@ -508,6 +508,12 @@ public abstract class AbstractCoordinator implements Closeable {
else if (!future.isRetriable())
throw exception;
// We need to return upon expired timer, in case if the client.poll returns immediately and the time
// has elapsed.
if (timer.isExpired()) {
return false;
}
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}

View File

@ -1561,6 +1561,45 @@ public class AbstractCoordinatorTest {
}
}
@Test
public void testBackoffAndRetryUponRetriableError() {
this.mockTime = new MockTime();
long currentTimeMs = System.currentTimeMillis();
this.mockTime.setCurrentTimeMs(System.currentTimeMillis());
setupCoordinator(); // note: uses 100ms backoff
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
// Retriable Exception
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
mockClient.prepareResponse(joinGroupResponse(Errors.NONE)); // Retry w/o error
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
coordinator.joinGroupIfNeeded(mockTime.timer(REQUEST_TIMEOUT_MS));
assertEquals(100, mockTime.milliseconds() - currentTimeMs, 1);
}
@Test
public void testReturnUponRetriableErrorAndExpiredTimer() throws InterruptedException {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
ExecutorService executor = Executors.newFixedThreadPool(1);
Timer t = mockTime.timer(500);
try {
Future<Boolean> attempt = executor.submit(() -> coordinator.joinGroupIfNeeded(t));
mockTime.sleep(500);
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
assertFalse(attempt.get());
} catch (Exception e) {
fail();
} finally {
executor.shutdownNow();
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
}
private AtomicBoolean prepareFirstHeartbeat() {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
mockClient.prepareResponse(body -> {

View File

@ -1484,7 +1484,8 @@ public abstract class ConsumerCoordinatorTest {
Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(0));
assertFalse(client.hasInFlightRequests());
coordinator.poll(time.timer(1));
assertTrue(coordinator.rejoinNeededOrPending());
client.respond(request -> {