KAFKA-18641: AsyncKafkaConsumer could lose records with auto offset commit (#18737)

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Jun Rao <jun@confluent.io>, Kirk True <ktrue@confluent.io>
This commit is contained in:
TengYao Chi 2025-02-21 01:11:01 +08:00 committed by GitHub
parent 1eecd02ce8
commit 709bfc506a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 359 additions and 245 deletions

View File

@ -105,7 +105,7 @@
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
<suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>

View File

@ -144,7 +144,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
/**
* If there is a reconciliation running (triggering commit, callbacks) for the
* assignmentReadyToReconcile. This will be true if {@link #maybeReconcile()} has been triggered
* assignmentReadyToReconcile. This will be true if {@link #maybeReconcile(boolean)} has been triggered
* after receiving a heartbeat response, or a metadata update.
*/
private boolean reconciliationInProgress;
@ -199,12 +199,15 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
*/
private boolean isPollTimerExpired;
private final boolean autoCommitEnabled;
AbstractMembershipManager(String groupId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Logger log,
Time time,
RebalanceMetricsManager metricsManager) {
RebalanceMetricsManager metricsManager,
boolean autoCommitEnabled) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.subscriptions = subscriptions;
@ -216,6 +219,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
this.stateUpdatesListeners = new ArrayList<>();
this.time = time;
this.metricsManager = metricsManager;
this.autoCommitEnabled = autoCommitEnabled;
}
/**
@ -791,8 +795,16 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
* - Another reconciliation is already in progress.
* - There are topics that haven't been added to the current assignment yet, but all their topic IDs
* are missing from the target assignment.
*
* @param canCommit Controls whether reconciliation can proceed when auto-commit is enabled.
* Set to true only when the current offset positions are safe to commit.
* If false and auto-commit enabled, the reconciliation will be skipped.
*/
void maybeReconcile() {
public void maybeReconcile(boolean canCommit) {
if (state != MemberState.RECONCILING) {
return;
}
if (targetAssignmentReconciled()) {
log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " +
"current assignment.");
@ -818,6 +830,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
return;
}
if (autoCommitEnabled && !canCommit) return;
markReconciliationInProgress();
// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are
@ -1347,7 +1360,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
/**
* @return If there is a reconciliation in process now. Note that reconciliation is triggered
* by a call to {@link #maybeReconcile()}. Visible for testing.
* by a call to {@link #maybeReconcile(boolean)}. Visible for testing.
*/
boolean reconciliationInProgress() {
return reconciliationInProgress;
@ -1383,9 +1396,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
* time-sensitive operations should be performed
*/
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (state == MemberState.RECONCILING) {
maybeReconcile();
}
maybeReconcile(false);
return NetworkClientDelegate.PollResult.EMPTY;
}

View File

@ -748,9 +748,14 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
}
do {
PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are still polling.
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
@ -818,7 +823,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
try {
AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((committedOffsets, throwable) -> {
if (throwable == null) {
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(committedOffsets);
}
@ -846,6 +850,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
}
applicationEventHandler.add(commitEvent);
// This blocks until the background thread retrieves allConsumed positions to commit if none were explicitly specified.
// This operation will ensure that the offsets to commit are not affected by fetches which may start after this
ConsumerUtils.getResult(commitEvent.offsetsReady(), defaultApiTimeoutMs.toMillis());
return commitEvent.future();
}

View File

@ -171,8 +171,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
}
/**
* Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any. The function will
* also try to autocommit the offsets, if feature is enabled.
* Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any.
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
@ -186,7 +185,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
return drainPendingOffsetCommitRequests();
}
maybeAutoCommitAsync();
if (!pendingRequests.hasUnsentRequests())
return EMPTY;
@ -264,7 +262,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* In that case, the next auto-commit request will be sent on the next call to poll, after a
* response for the in-flight is received.
*/
public void maybeAutoCommitAsync() {
private void maybeAutoCommitAsync() {
if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
OffsetCommitRequestState requestState = createOffsetCommitRequest(
subscriptions.allConsumed(),
@ -298,7 +296,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
/**
* Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval.
* This is used for committing offsets before revoking partitions. This will retry committing
* This is used for committing offsets before rebalance. This will retry committing
* the latest offsets until the request succeeds, fails with a fatal error, or the timeout
* expires. Note that:
* <ul>
@ -306,18 +304,18 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* including the member ID and latest member epoch received from the broker.</li>
* <li>Considers {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} as a fatal error, and will not
* retry it although the error extends RetriableException. The reason is that if a topic
* or partition is deleted, revocation would not finish in time since the auto commit would keep retrying.</li>
* or partition is deleted, rebalance would not finish in time since the auto commit would keep retrying.</li>
* </ul>
*
* Also note that this will generate a commit request even if there is another one in-flight,
* generated by the auto-commit on the interval logic, to ensure that the latest offsets are
* committed before revoking partitions.
* committed before rebalance.
*
* @return Future that will complete when the offsets are successfully committed. It will
* complete exceptionally if the commit fails with a non-retriable error, or if the retry
* timeout expires.
*/
public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final long deadlineMs) {
public CompletableFuture<Void> maybeAutoCommitSyncBeforeRebalance(final long deadlineMs) {
if (!autoCommitEnabled()) {
return CompletableFuture.completedFuture(null);
}
@ -325,11 +323,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
CompletableFuture<Void> result = new CompletableFuture<>();
OffsetCommitRequestState requestState =
createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
autoCommitSyncBeforeRevocationWithRetries(requestState, result);
autoCommitSyncBeforeRebalanceWithRetries(requestState, result);
return result;
}
private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState requestAttempt,
private void autoCommitSyncBeforeRebalanceWithRetries(OffsetCommitRequestState requestAttempt,
CompletableFuture<Void> result) {
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = requestAutoCommit(requestAttempt);
commitAttempt.whenComplete((committedOffsets, error) -> {
@ -338,10 +336,10 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
} else {
if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) {
if (requestAttempt.isExpired()) {
log.debug("Auto-commit sync before revocation timed out and won't be retried anymore");
log.debug("Auto-commit sync before rebalance timed out and won't be retried anymore");
result.completeExceptionally(maybeWrapAsTimeoutException(error));
} else if (error instanceof UnknownTopicOrPartitionException) {
log.debug("Auto-commit sync before revocation failed because topic or partition were deleted");
log.debug("Auto-commit sync before rebalance failed because topic or partition were deleted");
result.completeExceptionally(error);
} else {
// Make sure the auto-commit is retried with the latest offsets
@ -350,10 +348,10 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
error.getMessage());
requestAttempt.offsets = subscriptions.allConsumed();
requestAttempt.resetFuture();
autoCommitSyncBeforeRevocationWithRetries(requestAttempt, result);
autoCommitSyncBeforeRebalanceWithRetries(requestAttempt, result);
}
} else {
log.debug("Auto-commit sync before revocation failed with non-retriable error", error);
log.debug("Auto-commit sync before rebalance failed with non-retriable error", error);
result.completeExceptionally(error);
}
}
@ -388,14 +386,13 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* exceptionally depending on the response. If the request fails with a retriable error, the
* future will be completed with a {@link RetriableCommitFailedException}.
*/
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed);
if (commitOffsets.isEmpty()) {
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty()) {
log.debug("Skipping commit of empty offsets");
return CompletableFuture.completedFuture(Map.of());
}
maybeUpdateLastSeenEpochIfNewer(commitOffsets);
OffsetCommitRequestState commitRequest = createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
maybeUpdateLastSeenEpochIfNewer(offsets);
OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Long.MAX_VALUE);
pendingRequests.addOffsetCommitRequest(commitRequest);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> asyncCommitResult = new CompletableFuture<>();
@ -403,7 +400,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
asyncCommitResult.complete(commitOffsets);
asyncCommitResult.complete(offsets);
}
});
return asyncCommitResult;
@ -417,15 +414,14 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* an expected retriable error.
* @return Future that will complete when a successful response
*/
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
final long deadlineMs) {
Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed);
if (commitOffsets.isEmpty()) {
if (offsets.isEmpty()) {
return CompletableFuture.completedFuture(Map.of());
}
maybeUpdateLastSeenEpochIfNewer(commitOffsets);
maybeUpdateLastSeenEpochIfNewer(offsets);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new CompletableFuture<>();
OffsetCommitRequestState requestState = createOffsetCommitRequest(commitOffsets, deadlineMs);
OffsetCommitRequestState requestState = createOffsetCommitRequest(offsets, deadlineMs);
commitSyncWithRetries(requestState, result);
return result;
}
@ -566,7 +562,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
return error instanceof StaleMemberEpochException && memberInfo.memberEpoch.isPresent();
}
public void updateAutoCommitTimer(final long currentTimeMs) {
private void updateAutoCommitTimer(final long currentTimeMs) {
this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs));
}
@ -637,6 +633,24 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
});
}
/**
* This is a non-blocking method to update timer and trigger async auto-commit.
* <p>
* This method performs two main tasks:
* <ol>
* <li>Updates the internal timer with the current time.</li>
* <li>Initiate an asynchronous auto-commit operation for all consumed positions if needed.</li>
* </ol>
*
* @param currentTimeMs the current timestamp in millisecond
* @see CommitRequestManager#updateAutoCommitTimer(long)
* @see CommitRequestManager#maybeAutoCommitAsync()
*/
public void updateTimerAndMaybeCommit(final long currentTimeMs) {
updateAutoCommitTimer(currentTimeMs);
maybeAutoCommitAsync();
}
class OffsetCommitRequestState extends RetriableRequestState {
private Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;

View File

@ -123,8 +123,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
private final Optional<String> serverAssignor;
/**
* Manager to perform commit requests needed before revoking partitions (if auto-commit is
* enabled)
* Manager to perform commit requests needed before rebalance (if auto-commit is enabled)
*/
private final CommitRequestManager commitRequestManager;
@ -145,7 +144,8 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
LogContext logContext,
BackgroundEventHandler backgroundEventHandler,
Time time,
Metrics metrics) {
Metrics metrics,
boolean autoCommitEnabled) {
this(groupId,
groupInstanceId,
rebalanceTimeoutMs,
@ -156,7 +156,8 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
logContext,
backgroundEventHandler,
time,
new ConsumerRebalanceMetricsManager(metrics));
new ConsumerRebalanceMetricsManager(metrics),
autoCommitEnabled);
}
// Visible for testing
@ -170,13 +171,15 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
LogContext logContext,
BackgroundEventHandler backgroundEventHandler,
Time time,
RebalanceMetricsManager metricsManager) {
RebalanceMetricsManager metricsManager,
boolean autoCommitEnabled) {
super(groupId,
subscriptions,
metadata,
logContext.logger(ConsumerMembershipManager.class),
time,
metricsManager);
metricsManager,
autoCommitEnabled);
this.groupInstanceId = groupInstanceId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.serverAssignor = serverAssignor;
@ -252,7 +255,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
// best effort to commit the offsets in the case where the epoch might have changed while
// the current reconciliation is in process. Note this is using the rebalance timeout as
// it is the limit enforced by the broker to complete the reconciliation process.
return commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));
return commitRequestManager.maybeAutoCommitSyncBeforeRebalance(getDeadlineMsForTimeout(rebalanceTimeoutMs));
}
/**

View File

@ -217,7 +217,8 @@ public class RequestManagers implements Closeable {
logContext,
backgroundEventHandler,
time,
metrics);
metrics,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
// Update the group member ID label in the client telemetry reporter.
// According to KIP-1082, the consumer will generate the member ID as the incarnation ID of the process.

View File

@ -106,7 +106,8 @@ public class ShareMembershipManager extends AbstractMembershipManager<ShareGroup
metadata,
logContext.logger(ShareMembershipManager.class),
time,
metricsManager);
metricsManager,
false);
this.rackId = rackId;
}

View File

@ -206,13 +206,25 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
private void process(final PollEvent event) {
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts in the app thread
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs()));
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
// all commit request generation points have been passed,
// so it's safe to notify the app thread could proceed and start fetching
event.markReconcileAndAutoCommitComplete();
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
} else {
// safe to unblock - no auto-commit risk here:
// 1. commitRequestManager is not present
// 2. shareConsumer has no auto-commit mechanism
event.markReconcileAndAutoCommitComplete();
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
@ -234,7 +246,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
try {
CommitRequestManager manager = requestManagers.commitRequestManager.get();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitAsync(event.offsets());
Map<TopicPartition, OffsetAndMetadata> offsets = event.offsets().orElseGet(subscriptions::allConsumed);
event.markOffsetsReady();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitAsync(offsets);
future.whenComplete(complete(event.future()));
} catch (Exception e) {
event.future().completeExceptionally(e);
@ -250,7 +264,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
try {
CommitRequestManager manager = requestManagers.commitRequestManager.get();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitSync(event.offsets(), event.deadlineMs());
Map<TopicPartition, OffsetAndMetadata> offsets = event.offsets().orElseGet(subscriptions::allConsumed);
event.markOffsetsReady();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitSync(offsets, event.deadlineMs());
future.whenComplete(complete(event.future()));
} catch (Exception e) {
event.future().completeExceptionally(e);
@ -275,8 +291,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
private void process(final AssignmentChangeEvent event) {
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager manager = requestManagers.commitRequestManager.get();
manager.updateAutoCommitTimer(event.currentTimeMs());
manager.maybeAutoCommitAsync();
manager.updateTimerAndMaybeCommit(event.currentTimeMs());
}
log.info("Assigned to partition(s): {}", event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public abstract class CommitEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
@ -30,6 +31,13 @@ public abstract class CommitEvent extends CompletableApplicationEvent<Map<TopicP
*/
private final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets;
/**
* Future that completes when allConsumed offsets have been calculated.
* The app thread waits for this future before returning control to ensure
* the offsets to be committed are up-to-date.
*/
protected final CompletableFuture<Void> offsetsReady = new CompletableFuture<>();
protected CommitEvent(final Type type, final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, final long deadlineMs) {
super(type, deadlineMs);
this.offsets = validate(offsets);
@ -57,6 +65,14 @@ public abstract class CommitEvent extends CompletableApplicationEvent<Map<TopicP
return offsets;
}
public CompletableFuture<Void> offsetsReady() {
return offsetsReady;
}
public void markOffsetsReady() {
offsetsReady.complete(null);
}
@Override
protected String toStringBase() {
return super.toStringBase() + ", offsets=" + offsets;

View File

@ -16,10 +16,26 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import java.util.concurrent.CompletableFuture;
public class PollEvent extends ApplicationEvent {
private final long pollTimeMs;
/**
* A future that represents the completion of reconciliation and auto-commit
* processing.
* This future is completed when all commit request generation points have
* been passed, including:
* <ul>
* <li>auto-commit on rebalance</li>
* <li>auto-commit on the interval</li>
* </ul>
* Once completed, it signals that it's safe for the consumer to proceed with
* fetching new records.
*/
private final CompletableFuture<Void> reconcileAndAutoCommit = new CompletableFuture<>();
public PollEvent(final long pollTimeMs) {
super(Type.POLL);
this.pollTimeMs = pollTimeMs;
@ -29,6 +45,14 @@ public class PollEvent extends ApplicationEvent {
return pollTimeMs;
}
public CompletableFuture<Void> reconcileAndAutoCommit() {
return reconcileAndAutoCommit;
}
public void markReconcileAndAutoCommitComplete() {
reconcileAndAutoCommit.complete(null);
}
@Override
public String toStringBase() {
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;

View File

@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
@ -294,6 +295,7 @@ public class AsyncKafkaConsumerTest {
offsets.put(t0, new OffsetAndMetadata(10L));
offsets.put(t1, new OffsetAndMetadata(20L));
markOffsetsReadyForCommitEvent();
consumer.commitAsync(offsets, null);
final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
@ -395,6 +397,7 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup();
markReconcileAndAutoCommitCompleteForPollEvent();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@ -414,6 +417,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@ -437,6 +441,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call
@ -473,6 +478,7 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener);
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
assertTrue(callbackExecuted.get());
}
@ -494,6 +500,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@ -585,6 +592,7 @@ public class AsyncKafkaConsumerTest {
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
markOffsetsReadyForCommitEvent();
consumer.commitAsync();
CompletableApplicationEvent<Void> event = getLastEnqueuedEvent();
@ -618,6 +626,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
markReconcileAndAutoCommitCompleteForPollEvent();
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO),
callback,
null);
@ -733,6 +742,7 @@ public class AsyncKafkaConsumerTest {
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
markOffsetsReadyForCommitEvent();
consumer.commitSyncAllConsumed(time.timer(100));
ArgumentCaptor<SyncCommitEvent> eventCaptor = ArgumentCaptor.forClass(SyncCommitEvent.class);
@ -1027,6 +1037,7 @@ public class AsyncKafkaConsumerTest {
ApplicationEvent event = invocation.getArgument(0);
if (event instanceof SyncCommitEvent) {
capturedEvent.set((SyncCommitEvent) event);
((SyncCommitEvent) event).markOffsetsReady();
}
return null;
}).when(applicationEventHandler).add(any());
@ -1051,7 +1062,9 @@ public class AsyncKafkaConsumerTest {
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
markOffsetsReadyForCommitEvent();
consumer.commitAsync();
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ofMillis(10)));
assertInstanceOf(TimeoutException.class, e.getCause());
}
@ -1392,6 +1405,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(e);
}
markReconcileAndAutoCommitCompleteForPollEvent();
// This will trigger the background event queue to process our background event message.
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
if (expectedException.isPresent()) {
@ -1461,6 +1475,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
assertEquals(expectedException.getMessage(), exception.getMessage());
@ -1479,6 +1494,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent2);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
assertEquals(expectedException1.getMessage(), exception.getMessage());
@ -1511,6 +1527,7 @@ public class AsyncKafkaConsumerTest {
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@ -1534,6 +1551,7 @@ public class AsyncKafkaConsumerTest {
final Properties props = requiredConsumerConfigAndGroupId("consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@ -1562,6 +1580,7 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1"));
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
@ -1580,7 +1599,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(new TopicPartition("t1", 1)));
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, atLeast(1))
@ -1617,6 +1636,7 @@ public class AsyncKafkaConsumerTest {
).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent();
// And then poll for up to 10000ms, which should return 2 records without timing out
ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000));
assertEquals(2, returnedRecords.count());
@ -1720,6 +1740,7 @@ public class AsyncKafkaConsumerTest {
// interrupt the thread and call poll
try {
Thread.currentThread().interrupt();
markReconcileAndAutoCommitCompleteForPollEvent();
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
} finally {
// clear interrupted state again since this thread may be reused by JUnit
@ -1751,6 +1772,7 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList("topic"));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
verify(backgroundEventReaper).reap(time.milliseconds());
}
@ -1813,6 +1835,7 @@ public class AsyncKafkaConsumerTest {
completeUnsubscribeApplicationEventSuccessfully();
consumer.assign(singleton(new TopicPartition("topic1", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
@ -1975,6 +1998,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitAsyncApplicationEventExceptionally(Exception ex) {
doAnswer(invocation -> {
AsyncCommitEvent event = invocation.getArgument(0);
event.markOffsetsReady();
event.future().completeExceptionally(ex);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
@ -1983,6 +2007,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitSyncApplicationEventExceptionally(Exception ex) {
doAnswer(invocation -> {
SyncCommitEvent event = invocation.getArgument(0);
event.markOffsetsReady();
event.future().completeExceptionally(ex);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
@ -1995,6 +2020,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitAsyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
AsyncCommitEvent event = invocation.getArgument(0);
event.markOffsetsReady();
event.future().complete(null);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
@ -2003,6 +2029,7 @@ public class AsyncKafkaConsumerTest {
private void completeCommitSyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
SyncCommitEvent event = invocation.getArgument(0);
event.markOffsetsReady();
event.future().complete(null);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
@ -2090,4 +2117,20 @@ public class AsyncKafkaConsumerTest {
// Invokes callback
consumer.commitAsync();
}
private void markOffsetsReadyForCommitEvent() {
doAnswer(invocation -> {
CommitEvent event = invocation.getArgument(0);
event.markOffsetsReady();
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitEvent.class));
}
private void markReconcileAndAutoCommitCompleteForPollEvent() {
doAnswer(invocation -> {
PollEvent event = invocation.getArgument(0);
event.markReconcileAndAutoCommitComplete();
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
}
}

View File

@ -89,10 +89,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -184,7 +184,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.commitAsync(offsets);
assertPoll(false, 0, commitRequestManager);
}
@ -195,7 +195,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.commitAsync(offsets);
assertPoll(false, 0, commitRequestManager);
assertPoll(true, 1, commitRequestManager);
}
@ -207,7 +207,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.commitAsync(offsets);
assertPoll(1, commitRequestManager);
}
@ -219,9 +219,9 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
assertPoll(0, commitRequestManager);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"t1",
@ -248,9 +248,9 @@ public class CommitRequestManagerTest {
// Add the requests to the CommitRequestManager and store their futures
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
commitManager.commitSync(Optional.of(offsets1), deadlineMs);
commitManager.commitSync(offsets1, deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), deadlineMs);
commitManager.commitSync(Optional.of(offsets2), deadlineMs);
commitManager.commitSync(offsets2, deadlineMs);
commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), deadlineMs);
// Poll the CommitRequestManager and verify that the inflightOffsetFetches size is correct
@ -283,7 +283,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.commitAsync(offsets);
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
assertEquals(1, commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
@ -300,7 +300,7 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(false, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitSync(
Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
offsets, time.milliseconds() + defaultApiTimeoutMs);
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
@ -319,43 +319,17 @@ public class CommitRequestManagerTest {
@Test
public void testCommitSyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition tp = new TopicPartition("topic", 1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), "");
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
doReturn(offsets).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(false, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitSync(
Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"topic",
1,
(short) 1,
Errors.NONE)));
verify(subscriptionState).allConsumed();
verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
Collections.emptyMap(), time.milliseconds() + defaultApiTimeoutMs);
assertTrue(future.isDone());
assertEquals(offsets, commitOffsets);
}
assertEquals(0, commitRequestManager.unsentOffsetCommitRequests().size());
assertPoll(0, commitRequestManager);
@Test
public void testCommitSyncWithEmptyAllConsumedOffsets() {
subscriptionState = mock(SubscriptionState.class);
doReturn(Map.of()).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitSync(
Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
verify(subscriptionState).allConsumed();
verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertTrue(future.isDone());
assertTrue(commitOffsets.isEmpty());
assertEquals(Collections.emptyMap(), commitOffsets);
}
@Test
@ -367,7 +341,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.of(offsets));
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(offsets);
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
@ -386,39 +360,12 @@ public class CommitRequestManagerTest {
@Test
public void testCommitAsyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition tp = new TopicPartition("topic", 1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), "");
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
doReturn(offsets).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.empty());
assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size());
List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager);
pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
"topic",
1,
(short) 1,
Errors.NONE)));
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Collections.emptyMap());
verify(subscriptionState).allConsumed();
verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
assertTrue(future.isDone());
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertEquals(offsets, commitOffsets);
}
@Test
public void testCommitAsyncWithEmptyAllConsumedOffsets() {
subscriptionState = mock(SubscriptionState.class);
doReturn(Map.of()).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.empty());
verify(subscriptionState).allConsumed();
assertTrue(future.isDone());
assertPoll(0, commitRequestManager);
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertTrue(commitOffsets.isEmpty());
}
@ -433,7 +380,7 @@ public class CommitRequestManagerTest {
subscriptionState.assignFromUser(Collections.singleton(tp));
subscriptionState.seek(tp, 100);
time.sleep(commitInterval);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures = assertPoll(1, commitRequestManager);
// Complete the autocommit request exceptionally. It should fail right away, without retry.
futures.get(0).onComplete(mockOffsetCommitResponse(
@ -446,14 +393,14 @@ public class CommitRequestManagerTest {
// (making sure we wait for the backoff, to check that the failed request is not being
// retried).
time.sleep(retryBackoffMs);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(0, commitRequestManager);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
// Only when polling after the auto-commit interval, a new auto-commit request should be
// generated.
time.sleep(commitInterval);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
futures = assertPoll(1, commitRequestManager);
assertEmptyPendingRequests(commitRequestManager);
futures.get(0).onComplete(mockOffsetCommitResponse(
@ -475,7 +422,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, error, commitResult);
// We expect that request should have been retried on this sync commit.
@ -501,7 +448,7 @@ public class CommitRequestManagerTest {
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.UNKNOWN_MEMBER_ID);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
@ -521,7 +468,7 @@ public class CommitRequestManagerTest {
// Send commit request expected to be retried on retriable errors
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(
Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
offsets, time.milliseconds() + defaultApiTimeoutMs);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@ -540,7 +487,6 @@ public class CommitRequestManagerTest {
public void testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheInterval() {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
TopicPartition t1p = new TopicPartition("topic1", 0);
subscriptionState.assignFromUser(singleton(t1p));
@ -548,7 +494,7 @@ public class CommitRequestManagerTest {
// Async commit on the interval fails with fatal stale epoch and just resets the timer to
// the interval
commitRequestManager.maybeAutoCommitAsync();
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
verify(commitRequestManager).resetAutoCommitTimer();
@ -557,7 +503,7 @@ public class CommitRequestManagerTest {
assertEquals(0, res.unsentRequests.size(), "No request should be generated until the " +
"interval expires");
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -573,7 +519,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Async commit that won't be retried.
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(Optional.of(offsets));
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(offsets);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -595,11 +541,11 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures = assertPoll(1, commitRequestManager);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
// We want to make sure we don't resend autocommit if the previous request has not been
// completed, even if the interval expired
assertPoll(0, commitRequestManager);
@ -607,6 +553,7 @@ public class CommitRequestManagerTest {
// complete the unsent request and re-poll
futures.get(0).onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap<>())));
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(1, commitRequestManager);
}
@ -620,7 +567,7 @@ public class CommitRequestManagerTest {
// Send auto-commit request on the interval.
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
NetworkClientDelegate.FutureCompletionHandler autoCommitOnInterval =
@ -629,7 +576,7 @@ public class CommitRequestManagerTest {
// Another auto-commit request should be sent if a revocation happens, even if an
// auto-commit on the interval is in-flight.
CompletableFuture<Void> autoCommitBeforeRevocation =
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(200);
commitRequestManager.maybeAutoCommitSyncBeforeRebalance(200);
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetCommits.size());
// Receive response for initial auto-commit on interval
@ -646,7 +593,7 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures = assertPoll(1, commitRequestManager);
// complete the unsent request to trigger interceptor
@ -664,7 +611,7 @@ public class CommitRequestManagerTest {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures = assertPoll(1, commitRequestManager);
// complete the unsent request to trigger interceptor
@ -678,8 +625,7 @@ public class CommitRequestManagerTest {
public void testAutoCommitEmptyOffsetsDoesNotGenerateRequest() {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.maybeAutoCommitAsync();
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
verify(commitRequestManager).resetAutoCommitTimer();
}
@ -692,15 +638,13 @@ public class CommitRequestManagerTest {
// Auto-commit of empty offsets
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.maybeAutoCommitAsync();
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
// Next auto-commit consumed offsets (not empty). Should generate a request, ensuring
// that the previous auto-commit of empty did not leave the inflight request flag on
subscriptionState.seek(t1p, 100);
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.maybeAutoCommitAsync();
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetCommits.size());
verify(commitRequestManager, times(2)).resetAutoCommitTimer();
@ -716,8 +660,7 @@ public class CommitRequestManagerTest {
// Send auto-commit request that will remain in-flight without a response
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.maybeAutoCommitAsync();
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures = assertPoll(1, commitRequestManager);
assertEquals(1, futures.size());
NetworkClientDelegate.FutureCompletionHandler inflightCommitResult = futures.get(0);
@ -728,8 +671,7 @@ public class CommitRequestManagerTest {
// should not be reset either, to ensure that the next auto-commit is sent out as soon as
// the inflight receives a response.
time.sleep(100);
commitRequestManager.updateAutoCommitTimer(time.milliseconds());
commitRequestManager.maybeAutoCommitAsync();
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(0, commitRequestManager);
verify(commitRequestManager, never()).resetAutoCommitTimer();
@ -737,6 +679,7 @@ public class CommitRequestManagerTest {
// polling the manager.
inflightCommitResult.onComplete(
mockOffsetCommitResponse(t1p.topic(), t1p.partition(), (short) 1, Errors.NONE));
commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
assertPoll(1, commitRequestManager);
}
@ -924,7 +867,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
// Send async commit (not expected to be retried).
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(Optional.of(offsets));
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(offsets);
completeOffsetCommitRequestWithError(commitRequestManager, error);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@ -949,7 +892,7 @@ public class CommitRequestManagerTest {
// Send sync offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT);
// Request retried after backoff, and fails with retriable again. Should not complete yet
@ -984,7 +927,7 @@ public class CommitRequestManagerTest {
// Send offset commit request that fails with retriable error.
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, error);
// Sleep to expire the request timeout. Request should fail on the next poll with a
@ -1011,7 +954,7 @@ public class CommitRequestManagerTest {
// Send async commit request that fails with retriable error (not expected to be retried).
Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(Optional.of(offsets));
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitResult = commitRequestManager.commitAsync(offsets);
completeOffsetCommitRequestWithError(commitRequestManager, retriableError);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
@ -1036,7 +979,7 @@ public class CommitRequestManagerTest {
offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
commitRequestManager.commitSync(Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
commitRequestManager.commitSync(offsets, time.milliseconds() + defaultApiTimeoutMs);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -1061,7 +1004,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
commitRequestManager.commitSync(Optional.of(offsets), deadlineMs);
commitRequestManager.commitSync(offsets, deadlineMs);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException());
@ -1217,7 +1160,7 @@ public class CommitRequestManagerTest {
long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
// Send commit request expected to be retried on STALE_MEMBER_EPOCH error while it does not expire
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(deadlineMs);
commitRequestManager.maybeAutoCommitSyncBeforeRebalance(deadlineMs);
int newEpoch = 8;
String memberId = "member1";
@ -1271,7 +1214,7 @@ public class CommitRequestManagerTest {
// Send auto commit to revoke partitions, expected to be retried on STALE_MEMBER_EPOCH
// with the latest epochs received (using long deadline to avoid expiring the request
// while retrying with the new epochs)
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(Long.MAX_VALUE);
commitRequestManager.maybeAutoCommitSyncBeforeRebalance(Long.MAX_VALUE);
int initialEpoch = 1;
String memberId = "member1";
@ -1317,7 +1260,7 @@ public class CommitRequestManagerTest {
new OffsetAndMetadata(0));
long commitCreationTimeMs = time.milliseconds();
commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.commitAsync(offsets);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@ -1491,7 +1434,7 @@ public class CommitRequestManagerTest {
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
commitRequestManager.commitAsync(Optional.of(offsets));
commitRequestManager.commitAsync(offsets);
commitRequestManager.signalClose();
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());

View File

@ -126,7 +126,7 @@ public class ConsumerMembershipManagerTest {
metrics = new Metrics(time);
rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics);
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
}
private ConsumerMembershipManager createMembershipManagerJoiningGroup() {
@ -143,7 +143,7 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager));
backgroundEventHandler, time, rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
return manager;
}
@ -153,7 +153,7 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager,
metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager));
metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
manager.transitionToJoining();
return manager;
@ -232,7 +232,7 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager membershipManager = new ConsumerMembershipManager(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager);
backgroundEventHandler, time, rebalanceMetricsManager, true);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();
@ -309,7 +309,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.onHeartbeatSuccess(heartbeatResponse);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@ -608,8 +608,8 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
CompletableFuture<Void> commitResult = new CompletableFuture<>();
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
membershipManager.poll(time.milliseconds());
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
membershipManager.maybeReconcile(false);
// Get fenced, commit completes
membershipManager.transitionToFenced();
@ -627,7 +627,7 @@ public class ConsumerMembershipManagerTest {
// We have to reconcile & ack the assignment again
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@ -663,7 +663,7 @@ public class ConsumerMembershipManagerTest {
// stay in RECONCILING state, since an unresolved topic is assigned
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Collections.singletonList(new TopicIdPartition(topic1, new TopicPartition("topic1", 0)))
);
@ -679,7 +679,7 @@ public class ConsumerMembershipManagerTest {
// Receive original assignment again - full reconciliation not triggered but assignment is acked again
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(false);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
verifyReconciliationNotTriggered(membershipManager);
assertEquals(Collections.singletonMap(topic1, mkSortedSet(0)), membershipManager.currentAssignment().partitions);
@ -751,8 +751,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(topic2Assignment, membershipManager.topicPartitionsAwaitingReconciliation());
// Next reconciliation triggered in poll
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
assertEquals(Collections.emptySet(), membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState).assignFromSubscribedAwaitingCallback(topicPartitions(topic2Assignment, topic2Metadata), topicPartitions(topic2Assignment, topic2Metadata));
@ -797,7 +796,7 @@ public class ConsumerMembershipManagerTest {
);
receiveAssignment(newAssignment, membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(false);
verifyReconciliationNotTriggered(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
@ -819,8 +818,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.RECONCILING, membershipManager.state());
clearInvocations(membershipManager, commitRequestManager);
// Next poll should trigger final reconciliation
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager, Arrays.asList(topicId1Partition0, topicId2Partition0));
}
@ -858,7 +856,7 @@ public class ConsumerMembershipManagerTest {
);
receiveAssignment(newAssignment, membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(false);
// No full reconciliation triggered, but assignment needs to be acknowledged.
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -881,7 +879,7 @@ public class ConsumerMembershipManagerTest {
);
when(metadata.topicNames()).thenReturn(fullTopicMetadata);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager, Arrays.asList(topicId1Partition0, topicId2Partition0));
}
@ -973,7 +971,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions = Arrays.asList(
new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
@ -1207,7 +1205,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 0));
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -1249,7 +1247,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager, Collections.emptyList());
@ -1273,7 +1271,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(false);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.RECONCILING, membershipManager.state());
@ -1301,7 +1299,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
// Assignment should have been reconciled.
Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 1));
@ -1363,7 +1361,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions = topicIdPartitions(topicId, topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager, assignedPartitions);
@ -1382,7 +1380,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1, 2), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions = new ArrayList<>();
assignedPartitions.add(ownedPartition);
@ -1403,7 +1401,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggeredAndCompleted(membershipManager, expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -1436,7 +1434,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
testRevocationOfAllPartitionsCompleted(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
@ -1452,8 +1450,8 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
membershipManager.poll(time.milliseconds());
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
membershipManager.maybeReconcile(true);
// Member stays in RECONCILING while the commit request hasn't completed.
assertEquals(MemberState.RECONCILING, membershipManager.state());
@ -1465,7 +1463,7 @@ public class ConsumerMembershipManagerTest {
commitResult.complete(null);
InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRebalance(anyLong());
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
testRevocationOfAllPartitionsCompleted(membershipManager);
@ -1481,7 +1479,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
// Member stays in RECONCILING while the commit request hasn't completed.
assertEquals(MemberState.RECONCILING, membershipManager.state());
@ -1512,7 +1510,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(1, 2), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
TreeSet<TopicPartition> expectedSet = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
expectedSet.add(new TopicPartition(topicName, 1));
@ -1546,8 +1544,7 @@ public class ConsumerMembershipManagerTest {
String topicName = "topic1";
mockTopicNameInMetadataCache(Collections.singletonMap(topicId, topicName), true);
// When the next poll is run, the member should re-trigger reconciliation
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
List<TopicIdPartition> expectedAssignmentReconciled = topicIdPartitions(topicId, topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager, expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -1576,7 +1573,7 @@ public class ConsumerMembershipManagerTest {
// Next poll is run, but metadata still without the unresolved topic in it. Should keep
// the unresolved and request update again.
when(metadata.topicNames()).thenReturn(Collections.emptyMap());
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(false);
verifyReconciliationNotTriggered(membershipManager);
assertEquals(Collections.singleton(topicId), membershipManager.topicsAwaitingReconciliation());
verify(metadata, times(2)).requestUpdate(anyBoolean());
@ -1594,7 +1591,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verify(subscriptionState).markPendingRevocation(Set.of());
// Member should complete reconciliation
@ -1618,7 +1615,7 @@ public class ConsumerMembershipManagerTest {
// Revoke one of the 2 partitions
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition(topicName, 0)));
// Revocation should complete without requesting any metadata update given that the topic
@ -1671,7 +1668,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertTrue(membershipManager.reconciliationInProgress());
@ -1704,7 +1701,7 @@ public class ConsumerMembershipManagerTest {
// Step 5: receive an empty assignment, which means we should call revoke
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName, 0, 1));
receiveEmptyAssignment(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertTrue(membershipManager.reconciliationInProgress());
@ -1767,7 +1764,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(topicIdPartitionsMap(topicId, 0), membershipManager.currentAssignment().partitions);
@ -1826,7 +1823,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(topicIdPartitionsMap(topicId, 0), membershipManager.currentAssignment().partitions);
@ -1882,7 +1879,7 @@ public class ConsumerMembershipManagerTest {
mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned, partitionAdded,
new CounterConsumerRebalanceListener(), membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions, addedPartitions);
@ -1914,7 +1911,7 @@ public class ConsumerMembershipManagerTest {
mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned, partitionAdded,
listener, membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions, addedPartitions);
@ -2226,7 +2223,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@ -2248,7 +2245,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(0, listener.lostCount());
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
performCallback(
membershipManager,
invoker,
@ -2285,7 +2282,7 @@ public class ConsumerMembershipManagerTest {
long reconciliationDurationMs = 1234;
time.sleep(reconciliationDurationMs);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
// Complete commit request to complete the callback invocation
commitResult.complete(null);
@ -2317,7 +2314,7 @@ public class ConsumerMembershipManagerTest {
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
// assign partitions
performCallback(
@ -2338,7 +2335,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName, 0, 1));
receiveAssignment(topicId, Collections.singletonList(2), membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
performCallback(
membershipManager,
@ -2396,6 +2393,14 @@ public class ConsumerMembershipManagerTest {
assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo));
}
@Test
public void testPollMustCallsMaybeReconcileWithFalse() {
ConsumerMembershipManager membershipManager = createMemberInStableState();
membershipManager.poll(time.milliseconds());
verify(membershipManager).maybeReconcile(false);
verifyReconciliationNotTriggered(membershipManager);
}
private Object getMetricValue(Metrics metrics, MetricName name) {
return metrics.metrics().get(name).metricValue();
}
@ -2409,7 +2414,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, partitions, membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
List<TopicIdPartition> assignedPartitions =
partitions.stream().map(tp -> new TopicIdPartition(topicId,
@ -2424,7 +2429,7 @@ public class ConsumerMembershipManagerTest {
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
@ -2443,7 +2448,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, partitions, membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
@ -2464,7 +2469,7 @@ public class ConsumerMembershipManagerTest {
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
receiveAssignment(topicId, partitions, membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
@ -2489,7 +2494,7 @@ public class ConsumerMembershipManagerTest {
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
receiveAssignment(topicId, Collections.singletonList(newPartition), membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
clearInvocations(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
@ -2552,7 +2557,7 @@ public class ConsumerMembershipManagerTest {
if (withAutoCommit) {
when(commitRequestManager.autoCommitEnabled()).thenReturn(true);
CompletableFuture<Void> commitResult = new CompletableFuture<>();
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(commitResult);
return commitResult;
} else {
return CompletableFuture.completedFuture(null);
@ -2634,7 +2639,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatSuccess(heartbeatResponse);
if (triggerReconciliation) {
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(anyCollection(), anyCollection());
} else {
verify(subscriptionState, never()).assignFromSubscribed(anyCollection());
@ -2655,7 +2660,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@ -2722,7 +2727,7 @@ public class ConsumerMembershipManagerTest {
// Stale reconciliation should have been aborted and a new one should be triggered on the next poll.
assertFalse(membershipManager.reconciliationInProgress());
clearInvocations(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.maybeReconcile(true);
verify(membershipManager).markReconciliationInProgress();
}

View File

@ -63,6 +63,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@ -168,11 +169,9 @@ public class ApplicationEventProcessorTest {
doReturn(true).when(subscriptionState).assignFromUser(Collections.singleton(tp));
processor.process(event);
if (withGroupId) {
verify(commitRequestManager).updateAutoCommitTimer(currentTimeMs);
verify(commitRequestManager).maybeAutoCommitAsync();
verify(commitRequestManager).updateTimerAndMaybeCommit(currentTimeMs);
} else {
verify(commitRequestManager, never()).updateAutoCommitTimer(currentTimeMs);
verify(commitRequestManager, never()).maybeAutoCommitAsync();
verify(commitRequestManager, never()).updateTimerAndMaybeCommit(currentTimeMs);
}
verify(metadata).requestUpdateForNewTopics();
verify(subscriptionState).assignFromUser(Collections.singleton(tp));
@ -241,7 +240,8 @@ public class ApplicationEventProcessorTest {
setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
processor.process(event);
verify(commitRequestManager).updateAutoCommitTimer(12345);
assertTrue(event.reconcileAndAutoCommit().isDone());
verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
verify(membershipManager).onConsumerPoll();
verify(heartbeatRequestManager).resetPollTimer(12345);
}
@ -442,18 +442,35 @@ public class ApplicationEventProcessorTest {
assertEquals(mixedSubscriptionError, thrown);
}
@ParameterizedTest
@MethodSource("offsetsGenerator")
public void testSyncCommitEvent(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
@Test
public void testSyncCommitEventWithEmptyOffsets() {
Map<TopicPartition, OffsetAndMetadata> allConsumed =
Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, Optional.of(1), ""));
SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
setupProcessor(true);
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets, 12345);
doReturn(allConsumed).when(subscriptionState).allConsumed();
doReturn(CompletableFuture.completedFuture(allConsumed)).when(commitRequestManager).commitSync(allConsumed, 12345);
processor.process(event);
verify(commitRequestManager).commitSync(allConsumed, 12345);
assertTrue(event.offsetsReady.isDone());
Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get());
assertEquals(allConsumed, committedOffsets);
}
@Test
public void testSyncCommitEvent() {
Map<TopicPartition, OffsetAndMetadata> offsets =
Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, Optional.of(1), ""));
SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 12345);
setupProcessor(true);
doReturn(CompletableFuture.completedFuture(offsets)).when(commitRequestManager).commitSync(offsets, 12345);
processor.process(event);
verify(commitRequestManager).commitSync(offsets, 12345);
assertTrue(event.offsetsReady.isDone());
Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get());
assertEquals(offsets.orElse(Map.of()), committedOffsets);
assertEquals(offsets, committedOffsets);
}
@Test
@ -475,22 +492,40 @@ public class ApplicationEventProcessorTest {
doReturn(future).when(commitRequestManager).commitSync(any(), anyLong());
processor.process(event);
verify(commitRequestManager).commitSync(Optional.empty(), 12345);
verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345);
assertTrue(event.offsetsReady.isDone());
assertFutureThrows(IllegalStateException.class, event.future());
}
@ParameterizedTest
@MethodSource("offsetsGenerator")
public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
AsyncCommitEvent event = new AsyncCommitEvent(offsets);
@Test
public void testAsyncCommitEventWithEmptyOffsets() {
Map<TopicPartition, OffsetAndMetadata> allConsumed =
Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, Optional.of(1), ""));
AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
setupProcessor(true);
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets);
doReturn(CompletableFuture.completedFuture(allConsumed)).when(commitRequestManager).commitAsync(allConsumed);
doReturn(allConsumed).when(subscriptionState).allConsumed();
processor.process(event);
verify(commitRequestManager).commitAsync(allConsumed);
assertTrue(event.offsetsReady.isDone());
Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get());
assertEquals(allConsumed, committedOffsets);
}
@Test
public void testAsyncCommitEvent() {
Map<TopicPartition, OffsetAndMetadata> offsets =
Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, Optional.of(1), ""));
AsyncCommitEvent event = new AsyncCommitEvent(Optional.of(offsets));
setupProcessor(true);
doReturn(CompletableFuture.completedFuture(offsets)).when(commitRequestManager).commitAsync(offsets);
processor.process(event);
verify(commitRequestManager).commitAsync(offsets);
assertTrue(event.offsetsReady.isDone());
Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get());
assertEquals(offsets.orElse(Map.of()), committedOffsets);
assertEquals(offsets, committedOffsets);
}
@Test
@ -507,22 +542,17 @@ public class ApplicationEventProcessorTest {
AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
setupProcessor(true);
doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = new CompletableFuture<>();
future.completeExceptionally(new IllegalStateException());
doReturn(future).when(commitRequestManager).commitAsync(any());
processor.process(event);
verify(commitRequestManager).commitAsync(Optional.empty());
verify(commitRequestManager).commitAsync(Collections.emptyMap());
assertTrue(event.offsetsReady.isDone());
assertFutureThrows(IllegalStateException.class, event.future());
}
private static Stream<Arguments> offsetsGenerator() {
return Stream.of(
Arguments.of(Optional.empty()),
Arguments.of(Optional.of(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10, Optional.of(1), ""))))
);
}
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}