KAFKA-10337: await async commits in commitSync even if no offsets given (#13678)

The contract for Consumer#commitSync() guarantees that the callbacks for all prior async commits will be invoked before it returns. Prior to this patch the contract could be violated if an empty offsets map were passed in to Consumer#commitSync().

Reviewers: Philip Nee <philipnee@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Erik van Oosten 2023-06-07 16:55:03 +02:00 committed by GitHub
parent 8ad0ed3e61
commit 59d30a06fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 3 deletions

View File

@ -105,6 +105,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final boolean autoCommitEnabled;
private final int autoCommitIntervalMs;
private final ConsumerInterceptors<?, ?> interceptors;
// track number of async commits for which callback must be called
// package private for testing
final AtomicInteger inFlightAsyncCommits;
// track the number of pending async commits waiting on the coordinator lookup to complete
private final AtomicInteger pendingAsyncCommits;
// this collection must be thread-safe because it is modified from the response handler
@ -186,6 +190,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
this.inFlightAsyncCommits = new AtomicInteger();
this.pendingAsyncCommits = new AtomicInteger();
this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
@ -1125,10 +1130,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
inFlightAsyncCommits.incrementAndGet();
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
inFlightAsyncCommits.decrementAndGet();
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
@ -1136,6 +1144,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override
public void onFailure(RuntimeException e) {
inFlightAsyncCommits.decrementAndGet();
Exception commitException = e;
if (e instanceof RetriableException) {
@ -1164,8 +1174,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty())
return true;
if (offsets.isEmpty()) {
// We guarantee that the callbacks for all commitAsync() will be invoked when
// commitSync() completes, even if the user tries to commit empty offsets.
return invokePendingAsyncCommits(timer);
}
do {
if (coordinatorUnknownAndUnreadySync(timer)) {
@ -1223,6 +1236,26 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
private boolean invokePendingAsyncCommits(Timer timer) {
if (inFlightAsyncCommits.get() == 0) {
return true;
}
do {
ensureCoordinatorReady(timer);
client.poll(timer);
invokeCompletedOffsetCommitCallbacks();
if (inFlightAsyncCommits.get() == 0) {
return true;
}
timer.sleep(rebalanceConfig.retryBackoffMs);
} while (timer.notExpired());
return false;
}
private RequestFuture<Void> autoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
@ -1245,7 +1278,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
if (autoCommitEnabled)
return autoCommitOffsetsAsync();
return null;
return null;
}
private class DefaultOffsetCommitCallback implements OffsetCommitCallback {

View File

@ -570,6 +570,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.poll(time.timer(0));
assertTrue(coordinator.coordinatorUnknown());
assertTrue(client.hasInFlightRequests());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
client.respond(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(0));
@ -577,6 +578,7 @@ public abstract class ConsumerCoordinatorTest {
// after we've discovered the coordinator we should send
// out the commit request immediately
assertTrue(client.hasInFlightRequests());
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
}
@Test
@ -600,6 +602,30 @@ public abstract class ConsumerCoordinatorTest {
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testEnsureCompletingAsyncCommitsWhenSyncCommitWithoutOffsets() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
TopicPartition tp = new TopicPartition("foo", 0);
Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(123));
final AtomicBoolean committed = new AtomicBoolean();
coordinator.commitOffsetsAsync(offsets, (committedOffsets, exception) -> {
committed.set(true);
});
assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(100L)), "expected sync commit to fail");
assertFalse(committed.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
prepareOffsetCommitRequest(singletonMap(tp, 123L), Errors.NONE);
assertTrue(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(Long.MAX_VALUE)), "expected sync commit to succeed");
assertTrue(committed.get(), "expected commit callback to be invoked");
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}
@Test
public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@ -618,11 +644,13 @@ public abstract class ConsumerCoordinatorTest {
"Unexpected exception cause type: " + (cause == null ? null : cause.getClass()));
});
}
assertEquals(coordinator.inFlightAsyncCommits.get(), numRequests);
coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(numRequests, responses.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}
@Test
@ -668,6 +696,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
assertTrue(asyncCallbackInvoked.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}
@Test
@ -2323,6 +2352,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback secondCommitCallback = new MockCommitCallback();
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 2);
respondToOffsetCommitRequest(singletonMap(t1p, 100L), error);
consumerClient.pollNoWakeup();
@ -2332,6 +2362,7 @@ public abstract class ConsumerCoordinatorTest {
assertTrue(coordinator.coordinatorUnknown());
assertTrue(firstCommitCallback.exception instanceof RetriableCommitFailedException);
assertTrue(secondCommitCallback.exception instanceof RetriableCommitFailedException);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}
@Test
@ -2494,6 +2525,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(success.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}
@Test
@ -2503,6 +2535,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertNull(mockOffsetCommitCallback.exception);
@ -2533,6 +2566,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(success.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}
@Test
@ -2542,6 +2576,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertTrue(mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
@ -2556,6 +2591,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(coordinator.coordinatorUnknown());
@ -2572,6 +2608,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(coordinator.coordinatorUnknown());
@ -2588,6 +2625,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(coordinator.coordinatorUnknown());
@ -2656,6 +2694,7 @@ public abstract class ConsumerCoordinatorTest {
}
};
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
thread.start();
client.waitForRequests(2, 5000);
@ -2663,6 +2702,7 @@ public abstract class ConsumerCoordinatorTest {
respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()), Errors.NONE);
thread.join();
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets);
}
@ -3415,6 +3455,7 @@ public abstract class ConsumerCoordinatorTest {
assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback()));
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE)));
}
@ -3680,6 +3721,7 @@ public abstract class ConsumerCoordinatorTest {
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
}