KAFKA-17480: New consumer commit all consumed should retrieve offsets in background thread (#17150)

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>, TengYao Chi <kitingiao@gmail.com>
This commit is contained in:
PoAn Yang 2024-11-07 22:45:44 +08:00 committed by GitHub
parent 0181073d49
commit b213c64f97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 434 additions and 221 deletions

View File

@ -249,7 +249,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
// Last triggered async commit future. Used to wait until all previous async commits are completed.
// We only need to keep track of the last one, since they are guaranteed to complete in order.
private CompletableFuture<Void> lastPendingAsyncCommit = null;
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> lastPendingAsyncCommit = null;
// currentThread holds the threadId of the current thread accessing the AsyncKafkaConsumer
// and is used to prevent multithreaded access
@ -752,43 +752,43 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public void commitAsync(OffsetCommitCallback callback) {
commitAsync(subscriptions.allConsumed(), callback);
commitAsync(Optional.empty(), callback);
}
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
commitAsync(Optional.of(offsets), callback);
}
private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((r, t) -> {
lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((committedOffsets, throwable) -> {
if (t == null) {
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
if (throwable == null) {
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(committedOffsets);
}
if (callback == null) {
if (t != null) {
log.error("Offset commit with offsets {} failed", offsets, t);
if (throwable != null) {
log.error("Offset commit with offsets {} failed", committedOffsets, throwable);
}
return;
}
offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, offsets, (Exception) t);
offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, committedOffsets, (Exception) throwable);
});
} finally {
release();
}
}
private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commit(final CommitEvent commitEvent) {
maybeThrowInvalidGroupIdException();
offsetCommitCallbackInvoker.executeCallbacks();
Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
log.debug("Committing offsets: {}", offsets);
offsets.forEach(this::updateLastSeenEpochIfNewer);
if (offsets.isEmpty()) {
if (commitEvent.offsets().isPresent() && commitEvent.offsets().get().isEmpty()) {
return CompletableFuture.completedFuture(null);
}
@ -828,7 +828,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
} else {
log.info("Seeking to offset {} for partition {}", offset, partition);
}
updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
Timer timer = time.timer(defaultApiTimeoutMs);
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(
@ -914,9 +913,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
calculateDeadlineMs(time, timeout));
wakeupTrigger.setActiveTask(event.future());
try {
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = applicationEventHandler.addAndGet(event);
committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
return committedOffsets;
return applicationEventHandler.addAndGet(event);
} catch (TimeoutException e) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partitions " + partitions + " could be determined. Try tuning " +
@ -1294,13 +1291,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
// Visible for testing
void commitSyncAllConsumed(final Timer timer) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();
log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
log.debug("Sending synchronous auto-commit on closing");
try {
commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
commitSync(Duration.ofMillis(timer.remainingMs()));
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage());
log.warn("Synchronous auto-commit failed", e);
}
timer.update();
}
@ -1318,28 +1314,32 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
*/
@Override
public void commitSync(final Duration timeout) {
commitSync(subscriptions.allConsumed(), timeout);
commitSync(Optional.empty(), timeout);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
commitSync(Optional.of(offsets), Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
commitSync(Optional.of(offsets), timeout);
}
private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, Duration timeout) {
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, calculateDeadlineMs(time, timeout));
CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitFuture = commit(syncCommitEvent);
Timer requestTimer = time.timer(timeout.toMillis());
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, true);
wakeupTrigger.setActiveTask(commitFuture);
ConsumerUtils.getResult(commitFuture, requestTimer);
interceptors.onCommit(offsets);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = ConsumerUtils.getResult(commitFuture, requestTimer);
interceptors.onCommit(committedOffsets);
} finally {
wakeupTrigger.clearTask();
kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
@ -1588,11 +1588,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return groupMetadata.get().isPresent();
}
private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
if (offsetAndMetadata != null)
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}
/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}.
*

View File

@ -72,6 +72,7 @@ import static org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRE
public class CommitRequestManager implements RequestManager, MemberStateListener {
private final Time time;
private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
private final LogContext logContext;
private final Logger log;
private final Optional<AutoCommitState> autoCommitState;
@ -102,15 +103,16 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
private final MemberInfo memberInfo;
public CommitRequestManager(
final Time time,
final LogContext logContext,
final SubscriptionState subscriptions,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
final String groupId,
final Optional<String> groupInstanceId,
final Metrics metrics) {
final Time time,
final LogContext logContext,
final SubscriptionState subscriptions,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
final String groupId,
final Optional<String> groupInstanceId,
final Metrics metrics,
final ConsumerMetadata metadata) {
this(time,
logContext,
subscriptions,
@ -122,7 +124,8 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
OptionalDouble.empty(),
metrics);
metrics,
metadata);
}
// Visible for testing
@ -138,7 +141,8 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
final long retryBackoffMs,
final long retryBackoffMaxMs,
final OptionalDouble jitter,
final Metrics metrics) {
final Metrics metrics,
final ConsumerMetadata metadata) {
Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
this.time = time;
this.logContext = logContext;
@ -155,6 +159,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
this.groupId = groupId;
this.groupInstanceId = groupInstanceId;
this.subscriptions = subscriptions;
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.retryBackoffMaxMs = retryBackoffMaxMs;
this.jitter = jitter;
@ -381,20 +386,22 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* exceptionally depending on the response. If the request fails with a retriable error, the
* future will be completed with a {@link RetriableCommitFailedException}.
*/
public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty()) {
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed);
if (commitOffsets.isEmpty()) {
log.debug("Skipping commit of empty offsets");
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(Map.of());
}
OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Long.MAX_VALUE);
maybeUpdateLastSeenEpochIfNewer(commitOffsets);
OffsetCommitRequestState commitRequest = createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
pendingRequests.addOffsetCommitRequest(commitRequest);
CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> asyncCommitResult = new CompletableFuture<>();
commitRequest.future.whenComplete((committedOffsets, error) -> {
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
asyncCommitResult.complete(null);
asyncCommitResult.complete(commitOffsets);
}
});
return asyncCommitResult;
@ -403,15 +410,20 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
/**
* Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired.
*
* @param offsets Offsets to commit
* @param deadlineMs Time until which the request will be retried if it fails with
* an expected retriable error.
* @param offsets Offsets to commit
* @param deadlineMs Time until which the request will be retried if it fails with
* an expected retriable error.
* @return Future that will complete when a successful response
*/
public CompletableFuture<Void> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
final long deadlineMs) {
CompletableFuture<Void> result = new CompletableFuture<>();
OffsetCommitRequestState requestState = createOffsetCommitRequest(offsets, deadlineMs);
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
final long deadlineMs) {
Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed);
if (commitOffsets.isEmpty()) {
return CompletableFuture.completedFuture(Map.of());
}
maybeUpdateLastSeenEpochIfNewer(commitOffsets);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new CompletableFuture<>();
OffsetCommitRequestState requestState = createOffsetCommitRequest(commitOffsets, deadlineMs);
commitSyncWithRetries(requestState, result);
return result;
}
@ -439,14 +451,14 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
}
private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
CompletableFuture<Void> result) {
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
pendingRequests.addOffsetCommitRequest(requestAttempt);
// Retry the same commit request while it fails with RetriableException and the retry
// timeout hasn't expired.
requestAttempt.future.whenComplete((res, error) -> {
if (error == null) {
result.complete(null);
result.complete(requestAttempt.offsets);
} else {
if (error instanceof RetriableException) {
if (requestAttempt.isExpired()) {
@ -531,6 +543,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
"outbound buffer:" + fetchRequest);
}
if (error == null) {
maybeUpdateLastSeenEpochIfNewer(res);
result.complete(res);
} else {
if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) {
@ -615,6 +628,13 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
}
private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((topicPartition, offsetAndMetadata) -> {
if (offsetAndMetadata != null)
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
});
}
class OffsetCommitRequestState extends RetriableRequestState {
private Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;

View File

@ -205,7 +205,8 @@ public class RequestManagers implements Closeable {
offsetCommitCallbackInvoker,
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
metrics);
metrics,
metadata);
membershipManager = new ConsumerMembershipManager(
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,

View File

@ -198,29 +198,41 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
private void process(final AsyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().completeExceptionally(new KafkaException("Unable to async commit " +
"offset because the CommitRequestManager is not available. Check if group.id was set correctly"));
return;
}
CommitRequestManager manager = requestManagers.commitRequestManager.get();
CompletableFuture<Void> future = manager.commitAsync(event.offsets());
future.whenComplete(complete(event.future()));
try {
CommitRequestManager manager = requestManagers.commitRequestManager.get();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitAsync(event.offsets());
future.whenComplete(complete(event.future()));
} catch (Exception e) {
event.future().completeExceptionally(e);
}
}
private void process(final SyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().completeExceptionally(new KafkaException("Unable to sync commit " +
"offset because the CommitRequestManager is not available. Check if group.id was set correctly"));
return;
}
CommitRequestManager manager = requestManagers.commitRequestManager.get();
CompletableFuture<Void> future = manager.commitSync(event.offsets(), event.deadlineMs());
future.whenComplete(complete(event.future()));
try {
CommitRequestManager manager = requestManagers.commitRequestManager.get();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitSync(event.offsets(), event.deadlineMs());
future.whenComplete(complete(event.future()));
} catch (Exception e) {
event.future().completeExceptionally(e);
}
}
private void process(final FetchCommittedOffsetsEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().completeExceptionally(new KafkaException("Unable to fetch committed " +
"offset because the CommittedRequestManager is not available. Check if group.id was set correctly"));
"offset because the CommitRequestManager is not available. Check if group.id was set correctly"));
return;
}
CommitRequestManager manager = requestManagers.commitRequestManager.get();
@ -523,6 +535,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
private void process(final SeekUnvalidatedEvent event) {
try {
event.offsetEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
event.offset(),
event.offsetEpoch(),

View File

@ -20,13 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.Optional;
/**
* Event to commit offsets without waiting for a response, so the request won't be retried.
* If no offsets are provided, this event will commit all consumed offsets.
*/
public class AsyncCommitEvent extends CommitEvent {
public AsyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) {
public AsyncCommitEvent(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
super(Type.COMMIT_ASYNC, offsets, Long.MAX_VALUE);
}
}

View File

@ -21,15 +21,16 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
public abstract class CommitEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
/**
* Offsets to commit per partition.
*/
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets;
protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) {
protected CommitEvent(final Type type, final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, final long deadlineMs) {
super(type, deadlineMs);
this.offsets = validate(offsets);
}
@ -38,17 +39,21 @@ public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
* Validates the offsets are not negative and then returns the given offset map as
* {@link Collections#unmodifiableMap(Map) as unmodifiable}.
*/
private static Map<TopicPartition, OffsetAndMetadata> validate(final Map<TopicPartition, OffsetAndMetadata> offsets) {
for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
private static Optional<Map<TopicPartition, OffsetAndMetadata>> validate(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
if (!offsets.isPresent()) {
return Optional.empty();
}
for (OffsetAndMetadata offsetAndMetadata : offsets.get().values()) {
if (offsetAndMetadata.offset() < 0) {
throw new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset());
}
}
return Collections.unmodifiableMap(offsets);
return Optional.of(Collections.unmodifiableMap(offsets.get()));
}
public Map<TopicPartition, OffsetAndMetadata> offsets() {
public Optional<Map<TopicPartition, OffsetAndMetadata>> offsets() {
return offsets;
}

View File

@ -20,14 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.Optional;
/**
* Event to commit offsets waiting for a response and retrying on expected retriable errors until
* the timer expires.
* the timer expires. If no offsets are provided, this event will commit all consumed offsets.
*/
public class SyncCommitEvent extends CommitEvent {
public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) {
public SyncCommitEvent(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, final long deadlineMs) {
super(Type.COMMIT_SYNC, offsets, deadlineMs);
}
}

View File

@ -54,7 +54,6 @@ import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChang
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@ -185,8 +184,22 @@ public class AsyncKafkaConsumerTest {
}
private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
// disable auto-commit by default, so we don't need to handle SyncCommitEvent for each case
if (!props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}
final ConsumerConfig config = new ConsumerConfig(props);
return newConsumer(config);
return new AsyncKafkaConsumer<>(
config,
new StringDeserializer(),
new StringDeserializer(),
time,
(a, b, c, d, e, f, g) -> applicationEventHandler,
a -> backgroundEventReaper,
(a, b, c, d, e, f, g) -> fetchCollector,
(a, b, c, d) -> metadata,
backgroundEventQueue
);
}
private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig config) {
@ -209,10 +222,10 @@ public class AsyncKafkaConsumerTest {
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
SubscriptionState subscriptions,
String groupId,
String clientId) {
String clientId,
boolean autoCommitEnabled) {
long retryBackoffMs = 100L;
int defaultApiTimeoutMs = 1000;
boolean autoCommitEnabled = true;
return new AsyncKafkaConsumer<>(
new LogContext(),
clientId,
@ -281,8 +294,10 @@ public class AsyncKafkaConsumerTest {
final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
assertEquals(offsets, commitEvent.offsets());
assertDoesNotThrow(() -> commitEvent.future().complete(null));
assertTrue(commitEvent.offsets().isPresent());
assertEquals(offsets, commitEvent.offsets().get());
commitEvent.future().complete(offsets);
assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
// Clean-up. Close the consumer here as we know it will cause a TimeoutException to be thrown.
@ -347,25 +362,6 @@ public class AsyncKafkaConsumerTest {
assertTrue((double) metric.metricValue() > 0);
}
@Test
public void testCommittedLeaderEpochUpdate() {
consumer = newConsumer();
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
final TopicPartition t2 = new TopicPartition("t0", 4);
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), ""));
topicPartitionOffsets.put(t1, null);
topicPartitionOffsets.put(t2, new OffsetAndMetadata(20L, Optional.of(3), ""));
completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
assertDoesNotThrow(() -> consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t2, 3);
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
}
@Test
public void testCommittedExceptionThrown() {
consumer = newConsumer();
@ -388,7 +384,6 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition(topicName, partition);
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@ -410,7 +405,6 @@ public class AsyncKafkaConsumerTest {
return Fetch.empty();
}).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@ -434,7 +428,6 @@ public class AsyncKafkaConsumerTest {
return Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), ""));
}).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@ -492,7 +485,6 @@ public class AsyncKafkaConsumerTest {
doReturn(Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), "")))
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
completeCommitSyncApplicationEventSuccessfully();
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
@ -527,71 +519,6 @@ public class AsyncKafkaConsumerTest {
assertThrows(callbackException.getClass(), () -> consumer.commitSync());
}
@Test
public void testCommitSyncLeaderEpochUpdate() {
consumer = newConsumer();
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), ""));
topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), ""));
completeCommitSyncApplicationEventSuccessfully();
completeAssignmentChangeEventSuccessfully();
consumer.assign(Arrays.asList(t0, t1));
assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets));
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
}
@Test
public void testCommitAsyncLeaderEpochUpdate() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
completeCommitSyncApplicationEventSuccessfully();
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), ""));
topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), ""));
when(metadata.currentLeader(t0)).thenReturn(
new LeaderAndEpoch(Optional.of(
new Node(1, "host", 9000)), Optional.of(1)));
when(metadata.currentLeader(t1)).thenReturn(
new LeaderAndEpoch(Optional.of(
new Node(1, "host", 9000)), Optional.of(1)));
completeAssignmentChangeEventSuccessfully();
consumer.assign(Arrays.asList(t0, t1));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(t0, 10);
consumer.seek(t1, 20);
MockCommitCallback callback = new MockCommitCallback();
assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback));
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
// Clean-Up. Close the consumer here as we know it will cause a TimeoutException to be thrown.
// If we get an error *other* than the TimeoutException, we'll fail the test.
try {
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
assertInstanceOf(TimeoutException.class, e.getCause());
} finally {
consumer = null;
}
}
@Test
public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
final TopicPartition tp = new TopicPartition("foo", 0);
@ -722,7 +649,8 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id"));
"client-id",
false));
completeUnsubscribeApplicationEventSuccessfully();
consumer.close(Duration.ZERO);
verifyUnsubscribeEvent(subscriptions);
@ -739,7 +667,8 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id"));
"client-id",
false));
doThrow(new KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
verifyUnsubscribeEvent(subscriptions);
@ -748,8 +677,7 @@ public class AsyncKafkaConsumerTest {
}
@Test
public void testAutoCommitSyncEnabled() {
completeCommitSyncApplicationEventSuccessfully();
public void testCommitSyncAllConsumed() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
consumer = newConsumer(
mock(FetchBuffer.class),
@ -757,14 +685,19 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id",
false);
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.commitSyncAllConsumed(time.timer(100));
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
ArgumentCaptor<SyncCommitEvent> eventCaptor = ArgumentCaptor.forClass(SyncCommitEvent.class);
verify(applicationEventHandler).add(eventCaptor.capture());
SyncCommitEvent capturedEvent = eventCaptor.getValue();
assertFalse(capturedEvent.offsets().isPresent(), "Expected empty optional offsets");
}
@Test
@ -776,12 +709,15 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id",
false);
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
completeUnsubscribeApplicationEventSuccessfully();
consumer.close();
verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class));
}
@ -1035,7 +971,9 @@ public class AsyncKafkaConsumerTest {
@Test
public void testNoWakeupInCloseCommit() {
TopicPartition tp = new TopicPartition("topic1", 0);
consumer = newConsumer();
Properties props = requiredConsumerConfigAndGroupId("consumer-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumer = newConsumer(props);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
@ -1280,8 +1218,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
consumer = newConsumer(config);
consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
@ -1297,8 +1234,7 @@ public class AsyncKafkaConsumerTest {
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
consumer = newConsumer(props);
final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
@ -1333,9 +1269,8 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataUpdate() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
consumer = newConsumer(config);
consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final ConsumerGroupMetadata oldGroupMetadata = consumer.groupMetadata();
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
final int expectedMemberEpoch = 42;
@ -1355,9 +1290,8 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataIsResetAfterUnsubscribe() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
consumer = newConsumer(config);
consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
consumer.subscribe(singletonList("topic"));
final int memberEpoch = 42;
@ -1479,8 +1413,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testBackgroundError() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
consumer = newConsumer(config);
consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition");
final ErrorEvent errorEvent = new ErrorEvent(expectedException);
@ -1495,8 +1428,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testMultipleBackgroundErrors() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
consumer = newConsumer(config);
consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition");
final ErrorEvent errorEvent1 = new ErrorEvent(expectedException1);
@ -1589,7 +1521,8 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id",
false);
final TopicPartition tp = new TopicPartition("topic", 0);
final List<ConsumerRecord<String, String>> records = singletonList(
new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));

View File

@ -87,6 +87,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -103,6 +104,7 @@ public class CommitRequestManagerTest {
private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
private final Node mockedNode = new Node(1, "host1", 9092);
private SubscriptionState subscriptionState;
private ConsumerMetadata metadata;
private LogContext logContext;
private MockTime time;
private CoordinatorRequestManager coordinatorRequestManager;
@ -118,6 +120,7 @@ public class CommitRequestManagerTest {
this.logContext = new LogContext();
this.time = new MockTime(0);
this.subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
this.metadata = mock(ConsumerMetadata.class);
this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
this.offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class);
this.props = new Properties();
@ -142,7 +145,8 @@ public class CommitRequestManagerTest {
retryBackoffMs,
retryBackoffMaxMs,
OptionalDouble.of(0),
metrics);
metrics,
metadata);
commitRequestManager.onMemberEpochUpdated(Optional.of(1), Uuid.randomUuid().toString());
Set<TopicPartition> requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1));
@ -175,7 +179,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
commitRequestManager.commitAsync(offsets);
commitRequestManager.commitAsync(Optional.of(offsets));
assertPoll(false, 0, commitRequestManager);
}
@ -186,7 +190,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
commitRequestManager.commitAsync(offsets);
commitRequestManager.commitAsync(Optional.of(offsets));
assertPoll(false, 0, commitRequestManager);
assertPoll(true, 1, commitRequestManager);
}
@ -198,7 +202,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
commitRequestManager.commitAsync(offsets);
commitRequestManager.commitAsync(Optional.of(offsets));
assertPoll(1, commitRequestManager);
}
@ -239,9 +243,9 @@ public class CommitRequestManagerTest {
// Add the requests to the CommitRequestManager and store their futures
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
commitManager.commitSync(offsets1, deadlineMs);
commitManager.commitSync(Optional.of(offsets1), deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), deadlineMs);
commitManager.commitSync(offsets2, deadlineMs);
commitManager.commitSync(Optional.of(offsets2), deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), deadlineMs);
// Poll the CommitRequestManager and verify that the inflightOffsetFetches size is correct
@ -274,13 +278,146 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
commitRequestManager.commitAsync(offsets);
commitRequestManager.commitAsync(Optional.of(offsets));
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
assertEquals(1, commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
assertEmptyPendingRequests(commitRequestManager);
}
@Test
public void testCommitSync() {
subscriptionState = mock(SubscriptionState.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition tp = new TopicPartition("topic", 1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), "");
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
CommitRequestManager commitRequestManager = create(false, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitSync(
Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"topic",
1,
(short) 1,
Errors.NONE)));
verify(subscriptionState, never()).allConsumed();
verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertTrue(future.isDone());
assertEquals(offsets, commitOffsets);
}
@Test
public void testCommitSyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition tp = new TopicPartition("topic", 1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), "");
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
doReturn(offsets).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(false, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitSync(
Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"topic",
1,
(short) 1,
Errors.NONE)));
verify(subscriptionState).allConsumed();
verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertTrue(future.isDone());
assertEquals(offsets, commitOffsets);
}
@Test
public void testCommitSyncWithEmptyAllConsumedOffsets() {
subscriptionState = mock(SubscriptionState.class);
doReturn(Map.of()).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitSync(
Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
verify(subscriptionState).allConsumed();
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertTrue(future.isDone());
assertTrue(commitOffsets.isEmpty());
}
@Test
public void testCommitAsync() {
subscriptionState = mock(SubscriptionState.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition tp = new TopicPartition("topic", 1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), "");
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.of(offsets));
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"topic",
1,
(short) 1,
Errors.NONE)));
verify(subscriptionState, never()).allConsumed();
verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
assertTrue(future.isDone());
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertEquals(offsets, commitOffsets);
}
@Test
public void testCommitAsyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition tp = new TopicPartition("topic", 1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), "");
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
doReturn(offsets).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.empty());
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"topic",
1,
(short) 1,
Errors.NONE)));
verify(subscriptionState).allConsumed();
verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
assertTrue(future.isDone());
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertEquals(offsets, commitOffsets);
}
@Test
public void testCommitAsyncWithEmptyAllConsumedOffsets() {
subscriptionState = mock(SubscriptionState.class);
doReturn(Map.of()).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.empty());
verify(subscriptionState).allConsumed();
assertTrue(future.isDone());
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertTrue(commitOffsets.isEmpty());
}
// This is the case of the async auto commit sent on calls to assign (async commit that
// should not be retried).
@Test
@ -333,7 +470,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, error, commitResult);
// We expect that request should have been retried on this sync commit.
@ -359,7 +496,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.UNKNOWN_MEMBER_ID);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
@ -378,8 +515,8 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Send commit request expected to be retried on retriable errors
CompletableFuture<Void> commitResult = commitRequestManager.commitSync(
offsets, time.milliseconds() + defaultApiTimeoutMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(
Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@ -431,7 +568,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Async commit that won't be retried.
CompletableFuture<Void> commitResult = commitRequestManager.commitAsync(offsets);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(Optional.of(offsets));
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -781,7 +918,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Send async commit (not expected to be retried).
CompletableFuture<Void> commitResult = commitRequestManager.commitAsync(offsets);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(Optional.of(offsets));
completeOffsetCommitRequestWithError(commitRequestManager, error);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@ -806,7 +943,7 @@ public class CommitRequestManagerTest {
// Send sync offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT);
// Request retried after backoff, and fails with retriable again. Should not complete yet
@ -839,7 +976,7 @@ public class CommitRequestManagerTest {
// Send offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, error);
// Sleep to expire the request timeout. Request should fail on the next poll with a
@ -869,7 +1006,7 @@ public class CommitRequestManagerTest {
// Send async commit request that fails with retriable error (not expected to be retried).
Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
CompletableFuture<Void> commitResult = commitRequestManager.commitAsync(offsets);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(Optional.of(offsets));
completeOffsetCommitRequestWithError(commitRequestManager, retriableError);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@ -894,7 +1031,7 @@ public class CommitRequestManagerTest {
offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
commitRequestManager.commitSync(offsets, time.milliseconds() + defaultApiTimeoutMs);
commitRequestManager.commitSync(Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -919,7 +1056,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
commitRequestManager.commitSync(offsets, deadlineMs);
commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException());
@ -1175,7 +1312,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long commitCreationTimeMs = time.milliseconds();
commitRequestManager.commitAsync(offsets);
commitRequestManager.commitAsync(Optional.of(offsets));
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -1338,7 +1475,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
commitRequestManager.commitAsync(offsets);
commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.signalClose();
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -1384,7 +1521,7 @@ public class CommitRequestManagerTest {
private void sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(
final CommitRequestManager commitRequestManager,
final Errors error,
final CompletableFuture<Void> commitResult) {
final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult) {
completeOffsetCommitRequestWithError(commitRequestManager, error);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@ -1438,7 +1575,8 @@ public class CommitRequestManagerTest {
retryBackoffMs,
retryBackoffMaxMs,
OptionalDouble.of(0),
metrics));
metrics,
metadata));
}
private ClientResponse buildOffsetFetchClientResponse(

View File

@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
@ -32,6 +33,7 @@ import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -45,7 +47,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -56,6 +57,7 @@ import java.util.regex.Pattern;
import java.util.stream.Stream;
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -72,6 +74,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings("ClassDataAbstractionCoupling")
public class ApplicationEventProcessorTest {
private final Time time = new MockTime();
private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class);
@ -137,8 +140,6 @@ public class ApplicationEventProcessorTest {
return Stream.of(
Arguments.of(new PollEvent(100)),
Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
Arguments.of(new AsyncCommitEvent(new HashMap<>())),
Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
Arguments.of(new AssignmentChangeEvent(12345, 12345, Collections.emptyList())));
@ -202,14 +203,16 @@ public class ApplicationEventProcessorTest {
@Test
public void testSeekUnvalidatedEvent() {
TopicPartition tp = new TopicPartition("topic", 0);
Optional<Integer> offsetEpoch = Optional.of(1);
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
0, Optional.empty(), Metadata.LeaderAndEpoch.noLeaderOrEpoch());
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty());
0, offsetEpoch, Metadata.LeaderAndEpoch.noLeaderOrEpoch());
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, offsetEpoch);
setupProcessor(false);
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any());
processor.process(event);
verify(metadata).updateLastSeenEpochIfNewer(tp, offsetEpoch.get());
verify(metadata).currentLeader(tp);
verify(subscriptionState).seekUnvalidated(tp, position);
assertDoesNotThrow(() -> event.future().get());
@ -262,6 +265,27 @@ public class ApplicationEventProcessorTest {
assertDoesNotThrow(() -> event.future().get());
}
@Test
public void testFetchCommittedOffsetsEvent() {
TopicPartition tp0 = new TopicPartition("topic", 0);
TopicPartition tp1 = new TopicPartition("topic", 1);
TopicPartition tp2 = new TopicPartition("topic", 2);
Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2);
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = Map.of(
tp0, new OffsetAndMetadata(10L, Optional.of(2), ""),
tp1, new OffsetAndMetadata(15L, Optional.empty(), ""),
tp2, new OffsetAndMetadata(20L, Optional.of(3), "")
);
FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent(partitions, 12345);
setupProcessor(true);
when(commitRequestManager.fetchOffsets(partitions, 12345)).thenReturn(CompletableFuture.completedFuture(topicPartitionOffsets));
processor.process(event);
verify(commitRequestManager).fetchOffsets(partitions, 12345);
assertEquals(topicPartitionOffsets, assertDoesNotThrow(() -> event.future().get()));
}
@Test
public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState() {
subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
@ -360,6 +384,87 @@ public class ApplicationEventProcessorTest {
assertDoesNotThrow(() -> event2.future().get());
}
@ParameterizedTest
@MethodSource("offsetsGenerator")
public void testSyncCommitEvent(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
setupProcessor(true);
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets, 12345);
processor.process(event);
verify(commitRequestManager).commitSync(offsets, 12345);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get());
assertEquals(offsets.orElse(Map.of()), committedOffsets);
}
@Test
public void testSyncCommitEventWithoutCommitRequestManager() {
SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
setupProcessor(false);
processor.process(event);
assertFutureThrows(event.future(), KafkaException.class);
}
@Test
public void testSyncCommitEventWithException() {
SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
setupProcessor(true);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new CompletableFuture<>();
future.completeExceptionally(new IllegalStateException());
doReturn(future).when(commitRequestManager).commitSync(any(), anyLong());
processor.process(event);
verify(commitRequestManager).commitSync(Optional.empty(), 12345);
assertFutureThrows(event.future(), IllegalStateException.class);
}
@ParameterizedTest
@MethodSource("offsetsGenerator")
public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
AsyncCommitEvent event = new AsyncCommitEvent(offsets);
setupProcessor(true);
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets);
processor.process(event);
verify(commitRequestManager).commitAsync(offsets);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get());
assertEquals(offsets.orElse(Map.of()), committedOffsets);
}
@Test
public void testAsyncCommitEventWithoutCommitRequestManager() {
AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
setupProcessor(false);
processor.process(event);
assertFutureThrows(event.future(), KafkaException.class);
}
@Test
public void testAsyncCommitEventWithException() {
AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
setupProcessor(true);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new CompletableFuture<>();
future.completeExceptionally(new IllegalStateException());
doReturn(future).when(commitRequestManager).commitAsync(any());
processor.process(event);
verify(commitRequestManager).commitAsync(Optional.empty());
assertFutureThrows(event.future(), IllegalStateException.class);
}
private static Stream<Arguments> offsetsGenerator() {
return Stream.of(
Arguments.of(Optional.empty()),
Arguments.of(Optional.of(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, Optional.of(1), ""))))
);
}
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}