MINOR: fix custom retry backoff in new group coordinator (#15170)

When a retryable write operation fails, we retry with the default 500ms backoff. If a custom retry backoff was used to originally schedule the operation, we should retry with the same custom backoff instead of the default.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Jeff Kim 2024-01-11 03:28:32 -05:00 committed by GitHub
parent 13a83d58f8
commit cd3b3d9804
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 1 deletions

View File

@ -343,7 +343,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
if (retry) { if (retry) {
log.info("The write event {} for the timer {} failed due to {}. Rescheduling it. ", log.info("The write event {} for the timer {} failed due to {}. Rescheduling it. ",
event.name, key, ex.getMessage()); event.name, key, ex.getMessage());
schedule(key, retryBackoff, TimeUnit.MILLISECONDS, retry, operation); schedule(key, retryBackoff, TimeUnit.MILLISECONDS, true, retryBackoff, operation);
} else { } else {
log.error("The write event {} for the timer {} failed due to {}. Ignoring it. ", log.error("The write event {} for the timer {} failed due to {}. Ignoring it. ",
event.name, key, ex.getMessage()); event.name, key, ex.getMessage());

View File

@ -1986,6 +1986,79 @@ public class CoordinatorRuntimeTest {
assertEquals(0, ctx.timer.size()); assertEquals(0, ctx.timer.size());
} }
@Test
public void testRetryableTimerWithCustomBackoff() throws InterruptedException {
MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10);
// Check initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.timer.size());
// Timer #1.
AtomicInteger cnt = new AtomicInteger(0);
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, 1000, () -> {
cnt.incrementAndGet();
throw new KafkaException("error");
});
// The coordinator timer should have one pending task.
assertEquals(1, ctx.timer.size());
// Advance time to fire the pending timer.
timer.advanceClock(10 + 1);
// The timer should have been called and the timer should have one pending task.
assertEquals(1, cnt.get());
assertEquals(1, ctx.timer.size());
// Advance past the default retry backoff.
timer.advanceClock(500 + 1);
// The timer should not have been called yet.
assertEquals(1, cnt.get());
assertEquals(1, ctx.timer.size());
// Advance past the custom retry.
timer.advanceClock(500 + 1);
// The timer should have been called and the timer should have one pending task.
assertEquals(2, cnt.get());
assertEquals(1, ctx.timer.size());
// Advance past the default retry backoff.
timer.advanceClock(500 + 1);
// The timer should not have been called yet.
assertEquals(2, cnt.get());
assertEquals(1, ctx.timer.size());
// Advance past the custom retry.
timer.advanceClock(500 + 1);
// The timer should have been called and the timer should have one pending task.
assertEquals(3, cnt.get());
assertEquals(1, ctx.timer.size());
// Cancel Timer #1.
ctx.timer.cancel("timer-1");
assertEquals(0, ctx.timer.size());
}
@Test @Test
public void testNonRetryableTimer() throws InterruptedException { public void testNonRetryableTimer() throws InterruptedException {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();