mirror of https://github.com/apache/kafka.git
KAFKA-15974: Enforce that event processing respects user-provided timeout (#15640)
The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values. The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException. On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases. The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause. Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change. This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events. Reviewers: Lianet Magrans <lianetmr@gmail.com>, Philip Nee <pnee@confluent.io>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
028e7a06dc
commit
a98c9be6b0
|
@ -47,6 +47,8 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
|
|||
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
|
@ -100,6 +102,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -132,6 +135,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLo
|
|||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets;
|
||||
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
|
||||
import static org.apache.kafka.common.utils.Utils.closeQuietly;
|
||||
import static org.apache.kafka.common.utils.Utils.isBlank;
|
||||
import static org.apache.kafka.common.utils.Utils.swallow;
|
||||
|
@ -164,48 +168,14 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
* <li>{@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread</li>
|
||||
* </ul>
|
||||
*/
|
||||
private class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> {
|
||||
private class BackgroundEventProcessor implements EventProcessor<BackgroundEvent> {
|
||||
|
||||
private final ApplicationEventHandler applicationEventHandler;
|
||||
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
|
||||
|
||||
public BackgroundEventProcessor(final LogContext logContext,
|
||||
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
|
||||
final ApplicationEventHandler applicationEventHandler,
|
||||
final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker) {
|
||||
super(logContext, backgroundEventQueue);
|
||||
this.applicationEventHandler = applicationEventHandler;
|
||||
public BackgroundEventProcessor(final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker) {
|
||||
this.rebalanceListenerInvoker = rebalanceListenerInvoker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}.
|
||||
* It is possible that {@link ErrorEvent an error}
|
||||
* could occur when processing the events. In such cases, the processor will take a reference to the first
|
||||
* error, continue to process the remaining events, and then throw the first error that occurred.
|
||||
*/
|
||||
@Override
|
||||
public boolean process() {
|
||||
AtomicReference<KafkaException> firstError = new AtomicReference<>();
|
||||
|
||||
ProcessHandler<BackgroundEvent> processHandler = (event, error) -> {
|
||||
if (error.isPresent()) {
|
||||
KafkaException e = error.get();
|
||||
|
||||
if (!firstError.compareAndSet(null, e)) {
|
||||
log.warn("An error occurred when processing the event: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
boolean hadEvents = process(processHandler);
|
||||
|
||||
if (firstError.get() != null)
|
||||
throw firstError.get();
|
||||
|
||||
return hadEvents;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final BackgroundEvent event) {
|
||||
switch (event.type()) {
|
||||
|
@ -247,7 +217,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
private final KafkaConsumerMetrics kafkaConsumerMetrics;
|
||||
private Logger log;
|
||||
private final String clientId;
|
||||
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
|
||||
private final BackgroundEventProcessor backgroundEventProcessor;
|
||||
private final CompletableEventReaper backgroundEventReaper;
|
||||
private final Deserializers<K, V> deserializers;
|
||||
|
||||
/**
|
||||
|
@ -294,6 +266,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
valueDeserializer,
|
||||
Time.SYSTEM,
|
||||
ApplicationEventHandler::new,
|
||||
CompletableEventReaper::new,
|
||||
FetchCollector::new,
|
||||
ConsumerMetadata::new,
|
||||
new LinkedBlockingQueue<>()
|
||||
|
@ -306,6 +279,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
final Deserializer<V> valueDeserializer,
|
||||
final Time time,
|
||||
final ApplicationEventHandlerFactory applicationEventHandlerFactory,
|
||||
final CompletableEventReaperFactory backgroundEventReaperFactory,
|
||||
final FetchCollectorFactory<K, V> fetchCollectorFactory,
|
||||
final ConsumerMetadataFactory metadataFactory,
|
||||
final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) {
|
||||
|
@ -317,6 +291,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
|
||||
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
||||
LogContext logContext = createLogContext(config, groupRebalanceConfig);
|
||||
this.backgroundEventQueue = backgroundEventQueue;
|
||||
this.log = logContext.logger(getClass());
|
||||
|
||||
log.debug("Initializing the Kafka consumer");
|
||||
|
@ -378,12 +353,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
);
|
||||
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
|
||||
metadata,
|
||||
applicationEventQueue,
|
||||
requestManagersSupplier);
|
||||
this.applicationEventHandler = applicationEventHandlerFactory.build(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventQueue,
|
||||
new CompletableEventReaper(logContext),
|
||||
applicationEventProcessorSupplier,
|
||||
networkClientDelegateSupplier,
|
||||
requestManagersSupplier);
|
||||
|
@ -395,11 +370,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
new RebalanceCallbackMetricsManager(metrics)
|
||||
);
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor(
|
||||
logContext,
|
||||
backgroundEventQueue,
|
||||
applicationEventHandler,
|
||||
rebalanceListenerInvoker
|
||||
);
|
||||
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
|
||||
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
|
||||
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
|
||||
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
|
||||
|
@ -444,6 +417,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
Time time,
|
||||
ApplicationEventHandler applicationEventHandler,
|
||||
BlockingQueue<BackgroundEvent> backgroundEventQueue,
|
||||
CompletableEventReaper backgroundEventReaper,
|
||||
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
|
||||
Metrics metrics,
|
||||
SubscriptionState subscriptions,
|
||||
|
@ -461,12 +435,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
|
||||
this.interceptors = Objects.requireNonNull(interceptors);
|
||||
this.time = time;
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor(
|
||||
logContext,
|
||||
backgroundEventQueue,
|
||||
applicationEventHandler,
|
||||
rebalanceListenerInvoker
|
||||
);
|
||||
this.backgroundEventQueue = backgroundEventQueue;
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor(rebalanceListenerInvoker);
|
||||
this.backgroundEventReaper = backgroundEventReaper;
|
||||
this.metrics = metrics;
|
||||
this.groupMetadata.set(initializeGroupMetadata(groupId, Optional.empty()));
|
||||
this.metadata = metadata;
|
||||
|
@ -526,7 +497,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
|
||||
|
||||
BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>();
|
||||
BlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
|
||||
this.backgroundEventQueue = new LinkedBlockingQueue<>();
|
||||
BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue);
|
||||
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
|
||||
logContext,
|
||||
|
@ -563,21 +534,17 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
|
||||
logContext,
|
||||
metadata,
|
||||
applicationEventQueue,
|
||||
requestManagersSupplier
|
||||
);
|
||||
this.applicationEventHandler = new ApplicationEventHandler(logContext,
|
||||
time,
|
||||
applicationEventQueue,
|
||||
new CompletableEventReaper(logContext),
|
||||
applicationEventProcessorSupplier,
|
||||
networkClientDelegateSupplier,
|
||||
requestManagersSupplier);
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor(
|
||||
logContext,
|
||||
backgroundEventQueue,
|
||||
applicationEventHandler,
|
||||
rebalanceListenerInvoker
|
||||
);
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor(rebalanceListenerInvoker);
|
||||
this.backgroundEventReaper = new CompletableEventReaper(logContext);
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
|
@ -587,6 +554,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
final LogContext logContext,
|
||||
final Time time,
|
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue,
|
||||
final CompletableEventReaper applicationEventReaper,
|
||||
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
|
||||
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
|
||||
final Supplier<RequestManagers> requestManagersSupplier
|
||||
|
@ -594,6 +562,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
interface CompletableEventReaperFactory {
|
||||
|
||||
CompletableEventReaper build(final LogContext logContext);
|
||||
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
interface FetchCollectorFactory<K, V> {
|
||||
|
||||
|
@ -939,14 +914,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
final Timer timer = time.timer(timeout);
|
||||
final FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent(
|
||||
partitions,
|
||||
timer);
|
||||
calculateDeadlineMs(time, timeout));
|
||||
wakeupTrigger.setActiveTask(event.future());
|
||||
try {
|
||||
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = applicationEventHandler.addAndGet(event,
|
||||
timer);
|
||||
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = applicationEventHandler.addAndGet(event);
|
||||
committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
|
||||
return committedOffsets;
|
||||
} catch (TimeoutException e) {
|
||||
|
@ -992,12 +965,11 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
throw new TimeoutException();
|
||||
}
|
||||
|
||||
final Timer timer = time.timer(timeout);
|
||||
final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, timer);
|
||||
final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, calculateDeadlineMs(time, timeout));
|
||||
wakeupTrigger.setActiveTask(topicMetadataEvent.future());
|
||||
try {
|
||||
Map<String, List<PartitionInfo>> topicMetadata =
|
||||
applicationEventHandler.addAndGet(topicMetadataEvent, timer);
|
||||
applicationEventHandler.addAndGet(topicMetadataEvent);
|
||||
|
||||
return topicMetadata.getOrDefault(topic, Collections.emptyList());
|
||||
} finally {
|
||||
|
@ -1021,11 +993,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
throw new TimeoutException();
|
||||
}
|
||||
|
||||
final Timer timer = time.timer(timeout);
|
||||
final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(timer);
|
||||
final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(calculateDeadlineMs(time, timeout));
|
||||
wakeupTrigger.setActiveTask(topicMetadataEvent.future());
|
||||
try {
|
||||
return applicationEventHandler.addAndGet(topicMetadataEvent, timer);
|
||||
return applicationEventHandler.addAndGet(topicMetadataEvent);
|
||||
} finally {
|
||||
wakeupTrigger.clearTask();
|
||||
}
|
||||
|
@ -1093,10 +1064,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
if (timestampsToSearch.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
final Timer timer = time.timer(timeout);
|
||||
ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
|
||||
timestampsToSearch,
|
||||
timer,
|
||||
calculateDeadlineMs(time, timeout),
|
||||
true);
|
||||
|
||||
// If timeout is set to zero return empty immediately; otherwise try to get the results
|
||||
|
@ -1106,7 +1076,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
return listOffsetsEvent.emptyResults();
|
||||
}
|
||||
|
||||
return applicationEventHandler.addAndGet(listOffsetsEvent, timer)
|
||||
return applicationEventHandler.addAndGet(listOffsetsEvent)
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
|
@ -1153,10 +1123,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
Map<TopicPartition, Long> timestampToSearch = partitions
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));
|
||||
Timer timer = time.timer(timeout);
|
||||
ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
|
||||
timestampToSearch,
|
||||
timer,
|
||||
calculateDeadlineMs(time, timeout),
|
||||
false);
|
||||
|
||||
// If timeout is set to zero return empty immediately; otherwise try to get the results
|
||||
|
@ -1167,9 +1136,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap;
|
||||
offsetAndTimestampMap = applicationEventHandler.addAndGet(
|
||||
listOffsetsEvent,
|
||||
timer);
|
||||
offsetAndTimestampMap = applicationEventHandler.addAndGet(listOffsetsEvent);
|
||||
return offsetAndTimestampMap.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
|
@ -1269,6 +1236,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
if (applicationEventHandler != null)
|
||||
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
|
||||
closeTimer.update();
|
||||
|
||||
// close() can be called from inside one of the constructors. In that case, it's possible that neither
|
||||
// the reaper nor the background event queue were constructed, so check them first to avoid NPE.
|
||||
if (backgroundEventReaper != null && backgroundEventQueue != null)
|
||||
backgroundEventReaper.reap(backgroundEventQueue);
|
||||
|
||||
closeQuietly(interceptors, "consumer interceptors", firstException);
|
||||
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
|
||||
closeQuietly(metrics, "consumer metrics", firstException);
|
||||
|
@ -1295,21 +1268,21 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) {
|
||||
if (!groupMetadata.get().isPresent())
|
||||
return;
|
||||
maybeAutoCommitSync(autoCommitEnabled, timer);
|
||||
|
||||
if (autoCommitEnabled)
|
||||
autoCommitSync(timer);
|
||||
|
||||
applicationEventHandler.add(new CommitOnCloseEvent());
|
||||
completeQuietly(
|
||||
() -> {
|
||||
maybeRevokePartitions();
|
||||
applicationEventHandler.addAndGet(new LeaveOnCloseEvent(timer), timer);
|
||||
applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer)));
|
||||
},
|
||||
"Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
void maybeAutoCommitSync(final boolean shouldAutoCommit,
|
||||
final Timer timer) {
|
||||
if (!shouldAutoCommit)
|
||||
return;
|
||||
void autoCommitSync(final Timer timer) {
|
||||
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();
|
||||
log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
|
||||
try {
|
||||
|
@ -1376,10 +1349,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
acquireAndEnsureOpen();
|
||||
long commitStart = time.nanoseconds();
|
||||
try {
|
||||
Timer requestTimer = time.timer(timeout.toMillis());
|
||||
SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer);
|
||||
SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, calculateDeadlineMs(time, timeout));
|
||||
CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
|
||||
|
||||
Timer requestTimer = time.timer(timeout.toMillis());
|
||||
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, true);
|
||||
|
||||
wakeupTrigger.setActiveTask(commitFuture);
|
||||
|
@ -1523,12 +1496,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
fetchBuffer.retainAll(Collections.emptySet());
|
||||
if (groupMetadata.get().isPresent()) {
|
||||
Timer timer = time.timer(Long.MAX_VALUE);
|
||||
UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(timer);
|
||||
UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer));
|
||||
applicationEventHandler.add(unsubscribeEvent);
|
||||
log.info("Unsubscribing all topics or patterns and assigned partitions");
|
||||
|
||||
try {
|
||||
processBackgroundEvents(backgroundEventProcessor, unsubscribeEvent.future(), timer);
|
||||
processBackgroundEvents(unsubscribeEvent.future(), timer);
|
||||
log.info("Unsubscribed all topics or patterns and assigned partitions");
|
||||
} catch (TimeoutException e) {
|
||||
log.error("Failed while waiting for the unsubscribe event to complete");
|
||||
|
@ -1637,7 +1610,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// Validate positions using the partition leader end offsets, to detect if any partition
|
||||
// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
|
||||
// request, retrieve the partition end offsets, and validate the current position against it.
|
||||
applicationEventHandler.addAndGet(new ValidatePositionsEvent(timer), timer);
|
||||
applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer)));
|
||||
|
||||
cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
|
||||
if (cachedSubscriptionHasAllFetchPositions) return true;
|
||||
|
@ -1660,7 +1633,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// which are awaiting reset. This will trigger a ListOffset request, retrieve the
|
||||
// partition offsets according to the strategy (ex. earliest, latest), and update the
|
||||
// positions.
|
||||
applicationEventHandler.addAndGet(new ResetPositionsEvent(timer), timer);
|
||||
applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer)));
|
||||
return true;
|
||||
} catch (TimeoutException e) {
|
||||
return false;
|
||||
|
@ -1693,9 +1666,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
final FetchCommittedOffsetsEvent event =
|
||||
new FetchCommittedOffsetsEvent(
|
||||
initializingPartitions,
|
||||
timer);
|
||||
calculateDeadlineMs(timer));
|
||||
wakeupTrigger.setActiveTask(event.future());
|
||||
final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event, timer);
|
||||
final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event);
|
||||
refreshCommittedOffsets(offsets, metadata, subscriptions);
|
||||
return true;
|
||||
} catch (TimeoutException e) {
|
||||
|
@ -1722,7 +1695,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
maybeThrowFencedInstanceException();
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
maybeUpdateSubscriptionMetadata();
|
||||
backgroundEventProcessor.process();
|
||||
processBackgroundEvents();
|
||||
|
||||
return updateFetchPositions(timer);
|
||||
}
|
||||
|
@ -1848,6 +1821,40 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}.
|
||||
* It is possible that {@link ErrorEvent an error}
|
||||
* could occur when processing the events. In such cases, the processor will take a reference to the first
|
||||
* error, continue to process the remaining events, and then throw the first error that occurred.
|
||||
*/
|
||||
private boolean processBackgroundEvents() {
|
||||
AtomicReference<KafkaException> firstError = new AtomicReference<>();
|
||||
|
||||
LinkedList<BackgroundEvent> events = new LinkedList<>();
|
||||
backgroundEventQueue.drainTo(events);
|
||||
|
||||
for (BackgroundEvent event : events) {
|
||||
try {
|
||||
if (event instanceof CompletableEvent)
|
||||
backgroundEventReaper.add((CompletableEvent<?>) event);
|
||||
|
||||
backgroundEventProcessor.process(event);
|
||||
} catch (Throwable t) {
|
||||
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
|
||||
if (!firstError.compareAndSet(null, e))
|
||||
log.warn("An error occurred when processing the background event: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
backgroundEventReaper.reap(time.milliseconds());
|
||||
|
||||
if (firstError.get() != null)
|
||||
throw firstError.get();
|
||||
|
||||
return !events.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method can be used by cases where the caller has an event that needs to both block for completion but
|
||||
* also process background events. For some events, in order to fully process the associated logic, the
|
||||
|
@ -1870,28 +1877,26 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
* As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an
|
||||
* {@link UnsubscribeEvent} on the application event queue. That event will eventually trigger the
|
||||
* rebalancing logic in the background thread. Critically, as part of this rebalancing work, the
|
||||
* {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked. However,
|
||||
* {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked for any
|
||||
* partitions the consumer owns. However,
|
||||
* this callback must be executed on the application thread. To achieve this, the background thread enqueues a
|
||||
* {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is
|
||||
* periodically queried by the application thread to see if there's work to be done. When the application thread
|
||||
* sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a
|
||||
* {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the
|
||||
* background event queue. Moments later, the background thread will see that event, process it, and continue
|
||||
* application event queue. Moments later, the background thread will see that event, process it, and continue
|
||||
* execution of the rebalancing logic. The rebalancing logic cannot complete until the
|
||||
* {@link ConsumerRebalanceListener} callback is performed.
|
||||
*
|
||||
* @param eventProcessor Event processor that contains the queue of events to process
|
||||
* @param future Event that contains a {@link CompletableFuture}; it is on this future that the
|
||||
* application thread will wait for completion
|
||||
* @param timer Overall timer that bounds how long to wait for the event to complete
|
||||
* @return {@code true} if the event completed within the timeout, {@code false} otherwise
|
||||
*/
|
||||
// Visible for testing
|
||||
<T> T processBackgroundEvents(EventProcessor<?> eventProcessor,
|
||||
Future<T> future,
|
||||
Timer timer) {
|
||||
<T> T processBackgroundEvents(Future<T> future, Timer timer) {
|
||||
do {
|
||||
boolean hadEvents = eventProcessor.process();
|
||||
boolean hadEvents = processBackgroundEvents();
|
||||
|
||||
try {
|
||||
if (future.isDone()) {
|
||||
|
|
|
@ -20,6 +20,8 @@ import org.apache.kafka.clients.KafkaClient;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.common.internals.IdempotentCloser;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.utils.KafkaThread;
|
||||
|
@ -31,9 +33,11 @@ import org.slf4j.Logger;
|
|||
import java.io.Closeable;
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
|
||||
|
@ -50,6 +54,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread";
|
||||
private final Time time;
|
||||
private final Logger log;
|
||||
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
|
||||
private final CompletableEventReaper applicationEventReaper;
|
||||
private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier;
|
||||
private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier;
|
||||
private final Supplier<RequestManagers> requestManagersSupplier;
|
||||
|
@ -63,12 +69,16 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
|
||||
public ConsumerNetworkThread(LogContext logContext,
|
||||
Time time,
|
||||
BlockingQueue<ApplicationEvent> applicationEventQueue,
|
||||
CompletableEventReaper applicationEventReaper,
|
||||
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
|
||||
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
|
||||
Supplier<RequestManagers> requestManagersSupplier) {
|
||||
super(BACKGROUND_THREAD_NAME, true);
|
||||
this.time = time;
|
||||
this.log = logContext.logger(getClass());
|
||||
this.applicationEventQueue = applicationEventQueue;
|
||||
this.applicationEventReaper = applicationEventReaper;
|
||||
this.applicationEventProcessorSupplier = applicationEventProcessorSupplier;
|
||||
this.networkClientDelegateSupplier = networkClientDelegateSupplier;
|
||||
this.requestManagersSupplier = requestManagersSupplier;
|
||||
|
@ -125,10 +135,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
* </ol>
|
||||
*/
|
||||
void runOnce() {
|
||||
// Process the events—if any—that were produced by the application thread. It is possible that when processing
|
||||
// an event generates an error. In such cases, the processor will log an exception, but we do not want those
|
||||
// errors to be propagated to the caller.
|
||||
applicationEventProcessor.process();
|
||||
processApplicationEvents();
|
||||
|
||||
final long currentTimeMs = time.milliseconds();
|
||||
final long pollWaitTimeMs = requestManagers.entries().stream()
|
||||
|
@ -144,6 +151,36 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
.map(Optional::get)
|
||||
.map(rm -> rm.maximumTimeToWait(currentTimeMs))
|
||||
.reduce(Long.MAX_VALUE, Math::min);
|
||||
|
||||
reapExpiredApplicationEvents(currentTimeMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the events—if any—that were produced by the application thread.
|
||||
*/
|
||||
private void processApplicationEvents() {
|
||||
LinkedList<ApplicationEvent> events = new LinkedList<>();
|
||||
applicationEventQueue.drainTo(events);
|
||||
|
||||
for (ApplicationEvent event : events) {
|
||||
try {
|
||||
if (event instanceof CompletableEvent)
|
||||
applicationEventReaper.add((CompletableEvent<?>) event);
|
||||
|
||||
applicationEventProcessor.process(event);
|
||||
} catch (Throwable t) {
|
||||
log.warn("Error processing event {}", t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* "Complete" any events that have expired. This cleanup step should only be called after the network I/O
|
||||
* thread has made at least one call to {@link NetworkClientDelegate#poll(long, long) poll} so that each event
|
||||
* is given least one attempt to satisfy any network requests <em>before</em> checking if a timeout has expired.
|
||||
*/
|
||||
private void reapExpiredApplicationEvents(long currentTimeMs) {
|
||||
applicationEventReaper.reap(currentTimeMs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -273,9 +310,10 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
log.error("Unexpected error during shutdown. Proceed with closing.", e);
|
||||
} finally {
|
||||
sendUnsentRequests(timer);
|
||||
applicationEventReaper.reap(applicationEventQueue);
|
||||
|
||||
closeQuietly(requestManagers, "request managers");
|
||||
closeQuietly(networkClientDelegate, "network client delegate");
|
||||
closeQuietly(applicationEventProcessor, "application event processor");
|
||||
log.debug("Closed the consumer network thread");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1337,6 +1337,7 @@ public class MembershipManagerImpl implements MembershipManager {
|
|||
Set<TopicPartition> partitions) {
|
||||
SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
|
||||
sortedPartitions.addAll(partitions);
|
||||
|
||||
CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
|
||||
backgroundEventHandler.add(event);
|
||||
log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName());
|
||||
|
|
|
@ -17,14 +17,13 @@
|
|||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
|
||||
|
||||
protected AbstractTopicMetadataEvent(final Type type, final Timer timer) {
|
||||
super(type, timer);
|
||||
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
|
||||
super(type, deadlineMs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,11 +16,9 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent {
|
||||
|
||||
public AllTopicsMetadataEvent(final Timer timer) {
|
||||
super(Type.ALL_TOPICS_METADATA, timer);
|
||||
public AllTopicsMetadataEvent(final long deadlineMs) {
|
||||
super(Type.ALL_TOPICS_METADATA, deadlineMs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.internals.RequestManagers;
|
|||
import org.apache.kafka.common.internals.IdempotentCloser;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
|
@ -32,7 +31,6 @@ import java.time.Duration;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
|
@ -49,6 +47,7 @@ public class ApplicationEventHandler implements Closeable {
|
|||
public ApplicationEventHandler(final LogContext logContext,
|
||||
final Time time,
|
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue,
|
||||
final CompletableEventReaper applicationEventReaper,
|
||||
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
|
||||
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
|
||||
final Supplier<RequestManagers> requestManagersSupplier) {
|
||||
|
@ -56,6 +55,8 @@ public class ApplicationEventHandler implements Closeable {
|
|||
this.applicationEventQueue = applicationEventQueue;
|
||||
this.networkThread = new ConsumerNetworkThread(logContext,
|
||||
time,
|
||||
applicationEventQueue,
|
||||
applicationEventReaper,
|
||||
applicationEventProcessorSupplier,
|
||||
networkClientDelegateSupplier,
|
||||
requestManagersSupplier);
|
||||
|
@ -99,17 +100,16 @@ public class ApplicationEventHandler implements Closeable {
|
|||
*
|
||||
* <p/>
|
||||
*
|
||||
* See {@link ConsumerUtils#getResult(Future, Timer)} and {@link Future#get(long, TimeUnit)} for more details.
|
||||
* See {@link ConsumerUtils#getResult(Future)} for more details.
|
||||
*
|
||||
* @param event A {@link CompletableApplicationEvent} created by the polling thread
|
||||
* @return Value that is the result of the event
|
||||
* @param <T> Type of return value of the event
|
||||
*/
|
||||
public <T> T addAndGet(final CompletableApplicationEvent<T> event, final Timer timer) {
|
||||
public <T> T addAndGet(final CompletableApplicationEvent<T> event) {
|
||||
Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null");
|
||||
Objects.requireNonNull(timer, "Timer provided to addAndGet must be non-null");
|
||||
add(event);
|
||||
return ConsumerUtils.getResult(event.future(), timer);
|
||||
return ConsumerUtils.getResult(event.future());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.slf4j.Logger;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -42,31 +41,20 @@ import java.util.function.Supplier;
|
|||
* An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread}
|
||||
* which processes {@link ApplicationEvent application events} generated by the application thread.
|
||||
*/
|
||||
public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> {
|
||||
public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> {
|
||||
|
||||
private final Logger log;
|
||||
private final ConsumerMetadata metadata;
|
||||
private final RequestManagers requestManagers;
|
||||
|
||||
public ApplicationEventProcessor(final LogContext logContext,
|
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue,
|
||||
final RequestManagers requestManagers,
|
||||
final ConsumerMetadata metadata) {
|
||||
super(logContext, applicationEventQueue);
|
||||
this.log = logContext.logger(ApplicationEventProcessor.class);
|
||||
this.requestManagers = requestManagers;
|
||||
this.metadata = metadata;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the events—if any—that were produced by the application thread. It is possible that when processing
|
||||
* an event generates an error. In such cases, the processor will log an exception, but we do not want those
|
||||
* errors to be propagated to the caller.
|
||||
*/
|
||||
public boolean process() {
|
||||
return process((event, error) -> error.ifPresent(e -> log.warn("Error processing event {}", e.getMessage(), e)));
|
||||
}
|
||||
|
||||
@SuppressWarnings({"CyclomaticComplexity"})
|
||||
@Override
|
||||
public void process(ApplicationEvent event) {
|
||||
|
@ -273,7 +261,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
manager.consumerRebalanceListenerCallbackCompleted(event);
|
||||
}
|
||||
|
||||
private void process(final CommitOnCloseEvent event) {
|
||||
private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) {
|
||||
if (!requestManagers.commitRequestManager.isPresent())
|
||||
return;
|
||||
log.debug("Signal CommitRequestManager closing");
|
||||
|
@ -309,7 +297,6 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
*/
|
||||
public static Supplier<ApplicationEventProcessor> supplier(final LogContext logContext,
|
||||
final ConsumerMetadata metadata,
|
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue,
|
||||
final Supplier<RequestManagers> requestManagersSupplier) {
|
||||
return new CachedSupplier<ApplicationEventProcessor>() {
|
||||
@Override
|
||||
|
@ -317,7 +304,6 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
RequestManagers requestManagers = requestManagersSupplier.get();
|
||||
return new ApplicationEventProcessor(
|
||||
logContext,
|
||||
applicationEventQueue,
|
||||
requestManagers,
|
||||
metadata
|
||||
);
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals.events;
|
|||
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
@ -30,11 +29,6 @@ public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
|
|||
*/
|
||||
private final Map<TopicPartition, OffsetAndMetadata> offsets;
|
||||
|
||||
protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final Timer timer) {
|
||||
super(type, timer);
|
||||
this.offsets = validate(offsets);
|
||||
}
|
||||
|
||||
protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) {
|
||||
super(type, deadlineMs);
|
||||
this.offsets = validate(offsets);
|
||||
|
|
|
@ -16,9 +16,6 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
|
@ -32,13 +29,9 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
|
|||
private final CompletableFuture<T> future;
|
||||
private final long deadlineMs;
|
||||
|
||||
protected CompletableApplicationEvent(final Type type, final Timer timer) {
|
||||
super(type);
|
||||
this.future = new CompletableFuture<>();
|
||||
Objects.requireNonNull(timer);
|
||||
this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
|
||||
}
|
||||
|
||||
/**
|
||||
* <em>Note</em>: the {@code deadlineMs} is the future time of expiration, <em>not</em> a timeout.
|
||||
*/
|
||||
protected CompletableApplicationEvent(final Type type, final long deadlineMs) {
|
||||
super(type);
|
||||
this.future = new CompletableFuture<>();
|
||||
|
|
|
@ -27,10 +27,15 @@ import java.util.concurrent.CompletableFuture;
|
|||
public abstract class CompletableBackgroundEvent<T> extends BackgroundEvent implements CompletableEvent<T> {
|
||||
|
||||
private final CompletableFuture<T> future;
|
||||
private final long deadlineMs;
|
||||
|
||||
protected CompletableBackgroundEvent(final Type type) {
|
||||
/**
|
||||
* <em>Note</em>: the {@code deadlineMs} is the future time of expiration, <em>not</em> a timeout.
|
||||
*/
|
||||
protected CompletableBackgroundEvent(final Type type, final long deadlineMs) {
|
||||
super(type);
|
||||
this.future = new CompletableFuture<>();
|
||||
this.deadlineMs = deadlineMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -38,8 +43,13 @@ public abstract class CompletableBackgroundEvent<T> extends BackgroundEvent impl
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long deadlineMs() {
|
||||
return deadlineMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String toStringBase() {
|
||||
return super.toStringBase() + ", future=" + future;
|
||||
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,9 +16,112 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
/**
|
||||
* {@code CompletableEvent} is an interface that is used by both {@link CompletableApplicationEvent} and
|
||||
* {@link CompletableBackgroundEvent} for common processing and logic. A {@code CompletableEvent} is one that
|
||||
* allows the caller to get the {@link #future() future} related to the event and the event's
|
||||
* {@link #deadlineMs() expiration timestamp}.
|
||||
*
|
||||
* @param <T> Return type for the event when completed
|
||||
*/
|
||||
public interface CompletableEvent<T> {
|
||||
|
||||
/**
|
||||
* Returns the {@link CompletableFuture future} associated with this event. Any event will have some related
|
||||
* logic that is executed on its behalf. The event can complete in one of the following ways:
|
||||
*
|
||||
* <ul>
|
||||
* <li>
|
||||
* Success: when the logic for the event completes successfully, the data generated by that event
|
||||
* (if applicable) is passed to {@link CompletableFuture#complete(Object)}. In the case where the generic
|
||||
* bound type is specified as {@link Void}, {@code null} is provided.</li>
|
||||
* <li>
|
||||
* Error: when the the event logic generates an error, the error is passed to
|
||||
* {@link CompletableFuture#completeExceptionally(Throwable)}.
|
||||
* </li>
|
||||
* <li>
|
||||
* Timeout: when the time spent executing the event logic exceeds the {@link #deadlineMs() deadline}, an
|
||||
* instance of {@link TimeoutException} should be created and passed to
|
||||
* {@link CompletableFuture#completeExceptionally(Throwable)}. This also occurs when an event remains
|
||||
* incomplete when the consumer closes.
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* @return Future on which the caller may block or query for completion
|
||||
*
|
||||
* @see CompletableEventReaper
|
||||
*/
|
||||
CompletableFuture<T> future();
|
||||
|
||||
/**
|
||||
* This is the deadline that represents the absolute wall clock time by which any event-specific execution should
|
||||
* complete. This is not a timeout value. <em>After</em> this time has passed,
|
||||
* {@link CompletableFuture#completeExceptionally(Throwable)} will be invoked with an instance of
|
||||
* {@link TimeoutException}.
|
||||
*
|
||||
* @return Absolute time for event to be completed
|
||||
*
|
||||
* @see CompletableEventReaper
|
||||
*/
|
||||
long deadlineMs();
|
||||
|
||||
/**
|
||||
* Calculate the deadline timestamp based on {@link Timer#currentTimeMs()} and {@link Timer#remainingMs()}.
|
||||
*
|
||||
* @param timer Timer
|
||||
*
|
||||
* @return Absolute time by which event should be completed
|
||||
*/
|
||||
static long calculateDeadlineMs(final Timer timer) {
|
||||
requireNonNull(timer);
|
||||
return calculateDeadlineMs(timer.currentTimeMs(), timer.remainingMs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the deadline timestamp based on {@link Timer#currentTimeMs()} and {@link Duration#toMillis()}.
|
||||
*
|
||||
* @param time Time
|
||||
* @param duration Duration
|
||||
*
|
||||
* @return Absolute time by which event should be completed
|
||||
*/
|
||||
static long calculateDeadlineMs(final Time time, final Duration duration) {
|
||||
return calculateDeadlineMs(requireNonNull(time).milliseconds(), requireNonNull(duration).toMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the deadline timestamp based on {@link Timer#currentTimeMs()} and timeout.
|
||||
*
|
||||
* @param time Time
|
||||
* @param timeoutMs Timeout, in milliseconds
|
||||
*
|
||||
* @return Absolute time by which event should be completed
|
||||
*/
|
||||
static long calculateDeadlineMs(final Time time, final long timeoutMs) {
|
||||
return calculateDeadlineMs(requireNonNull(time).milliseconds(), timeoutMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the deadline timestamp based on the current time and timeout.
|
||||
*
|
||||
* @param currentTimeMs Current time, in milliseconds
|
||||
* @param timeoutMs Timeout, in milliseconds
|
||||
*
|
||||
* @return Absolute time by which event should be completed
|
||||
*/
|
||||
static long calculateDeadlineMs(final long currentTimeMs, final long timeoutMs) {
|
||||
if (currentTimeMs > Long.MAX_VALUE - timeoutMs)
|
||||
return Long.MAX_VALUE;
|
||||
else
|
||||
return currentTimeMs + timeoutMs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* {@code CompletableEventReaper} is responsible for tracking {@link CompletableEvent time-bound events} and removing
|
||||
* any that exceed their {@link CompletableEvent#deadlineMs() deadline} (unless they've already completed). This
|
||||
* mechanism is used by the {@link AsyncKafkaConsumer} to enforce the timeout provided by the user in its API
|
||||
* calls (e.g. {@link AsyncKafkaConsumer#commitSync(Duration)}).
|
||||
*/
|
||||
public class CompletableEventReaper {
|
||||
|
||||
private final Logger log;
|
||||
|
||||
/**
|
||||
* List of tracked events that are candidates for expiration.
|
||||
*/
|
||||
private final List<CompletableEvent<?>> tracked;
|
||||
|
||||
public CompletableEventReaper(LogContext logContext) {
|
||||
this.log = logContext.logger(CompletableEventReaper.class);
|
||||
this.tracked = new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a new {@link CompletableEvent event} to track for later completion/expiration.
|
||||
*
|
||||
* @param event Event to track
|
||||
*/
|
||||
public void add(CompletableEvent<?> event) {
|
||||
tracked.add(Objects.requireNonNull(event, "Event to track must be non-null"));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method performs a two-step process to "complete" {@link CompletableEvent events} that have either expired
|
||||
* or completed normally:
|
||||
*
|
||||
* <ol>
|
||||
* <li>
|
||||
* For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an
|
||||
* instance of {@link TimeoutException} is created and passed to
|
||||
* {@link CompletableFuture#completeExceptionally(Throwable)}.
|
||||
* </li>
|
||||
* <li>
|
||||
* For each tracked event of which its {@link CompletableEvent#future() future} is already in the
|
||||
* {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events.
|
||||
* </li>
|
||||
* </ol>
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* This method should be called at regular intervals, based upon the needs of the resource that owns the reaper.
|
||||
*
|
||||
* @param currentTimeMs <em>Current</em> time with which to compare against the
|
||||
* <em>{@link CompletableEvent#deadlineMs() expiration time}</em>
|
||||
*/
|
||||
public void reap(long currentTimeMs) {
|
||||
Consumer<CompletableEvent<?>> expireEvent = event -> {
|
||||
long pastDueMs = currentTimeMs - event.deadlineMs();
|
||||
TimeoutException error = new TimeoutException(String.format("%s was %s ms past its expiration of %s", event.getClass().getSimpleName(), pastDueMs, event.deadlineMs()));
|
||||
|
||||
if (event.future().completeExceptionally(error)) {
|
||||
log.debug("Event {} completed exceptionally since its expiration of {} passed {} ms ago", event, event.deadlineMs(), pastDueMs);
|
||||
} else {
|
||||
log.trace("Event {} not completed exceptionally since it was previously completed", event);
|
||||
}
|
||||
};
|
||||
|
||||
// First, complete (exceptionally) any events that have passed their deadline AND aren't already complete.
|
||||
tracked.stream()
|
||||
.filter(e -> !e.future().isDone())
|
||||
.filter(e -> currentTimeMs >= e.deadlineMs())
|
||||
.forEach(expireEvent);
|
||||
// Second, remove any events that are already complete, just to make sure we don't hold references. This will
|
||||
// include any events that finished successfully as well as any events we just completed exceptionally above.
|
||||
tracked.removeIf(e -> e.future().isDone());
|
||||
}
|
||||
|
||||
/**
|
||||
* It is possible for the {@link AsyncKafkaConsumer#close() consumer to close} before completing the processing of
|
||||
* all the events in the queue. In this case, we need to
|
||||
* {@link CompletableFuture#completeExceptionally(Throwable) expire} any remaining events.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* Check each of the {@link #add(CompletableEvent) previously-added} {@link CompletableEvent completable events},
|
||||
* and for any that are incomplete, expire them. Also check the core event queue for any incomplete events and
|
||||
* likewise expire them.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* <em>Note</em>: because this is called in the context of {@link AsyncKafkaConsumer#close() closing consumer},
|
||||
* don't take the deadline into consideration, just close it regardless.
|
||||
*
|
||||
* @param events Events from a queue that have not yet been tracked that also need to be reviewed
|
||||
*/
|
||||
public void reap(Collection<?> events) {
|
||||
Objects.requireNonNull(events, "Event queue to reap must be non-null");
|
||||
|
||||
Consumer<CompletableEvent<?>> expireEvent = event -> {
|
||||
TimeoutException error = new TimeoutException(String.format("%s could not be completed before the consumer closed", event.getClass().getSimpleName()));
|
||||
|
||||
if (event.future().completeExceptionally(error)) {
|
||||
log.debug("Event {} completed exceptionally since the consumer is closing", event);
|
||||
} else {
|
||||
log.trace("Event {} not completed exceptionally since it was completed prior to the consumer closing", event);
|
||||
}
|
||||
};
|
||||
|
||||
tracked.stream()
|
||||
.filter(e -> !e.future().isDone())
|
||||
.forEach(expireEvent);
|
||||
tracked.clear();
|
||||
|
||||
events.stream()
|
||||
.filter(e -> e instanceof CompletableEvent<?>)
|
||||
.map(e -> (CompletableEvent<?>) e)
|
||||
.filter(e -> !e.future().isDone())
|
||||
.forEach(expireEvent);
|
||||
events.clear();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return tracked.size();
|
||||
}
|
||||
|
||||
public boolean contains(CompletableEvent<?> event) {
|
||||
return event != null && tracked.contains(event);
|
||||
}
|
||||
}
|
|
@ -39,7 +39,7 @@ public class ConsumerRebalanceListenerCallbackNeededEvent extends CompletableBac
|
|||
|
||||
public ConsumerRebalanceListenerCallbackNeededEvent(final ConsumerRebalanceListenerMethodName methodName,
|
||||
final SortedSet<TopicPartition> partitions) {
|
||||
super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED);
|
||||
super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, Long.MAX_VALUE);
|
||||
this.methodName = Objects.requireNonNull(methodName);
|
||||
this.partitions = Collections.unmodifiableSortedSet(partitions);
|
||||
}
|
||||
|
|
|
@ -16,111 +16,26 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.internals.IdempotentCloser;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
/**
|
||||
* An {@link EventProcessor} is the means by which events <em>produced</em> by thread <em>A</em> are
|
||||
* <em>processed</em> by thread <em>B</em>. By definition, threads <em>A</em> and <em>B</em> run in parallel to
|
||||
* each other, so a mechanism is needed with which to receive and process the events from the other thread. That
|
||||
* communication channel is formed around {@link BlockingQueue a shared queue} into which thread <em>A</em>
|
||||
* enqueues events and thread <em>B</em> reads and processes those events.
|
||||
* An {@code EventProcessor} is the means by which events are <em>processed</em>, the meaning of which is left
|
||||
* intentionally loose. This is in large part to keep the {@code EventProcessor} focused on what it means to process
|
||||
* the events, and <em>not</em> linking itself too closely with the rest of the surrounding application.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* The {@code EventProcessor} is envisaged as a stateless service that acts as a conduit, receiving an event and
|
||||
* dispatching to another block of code to process. The semantic meaning of each event is different, so the
|
||||
* {@code EventProcessor} will need to interact with other parts of the system that maintain state. The
|
||||
* implementation should not be concerned with the mechanism by which an event arrived for processing. While the
|
||||
* events are shuffled around the consumer subsystem by means of {@link BlockingQueue shared queues}, it should
|
||||
* be considered an anti-pattern to need to know how it arrived or what happens after its is processed.
|
||||
*/
|
||||
public abstract class EventProcessor<T> implements Closeable {
|
||||
|
||||
private final Logger log;
|
||||
private final BlockingQueue<T> eventQueue;
|
||||
private final IdempotentCloser closer;
|
||||
|
||||
protected EventProcessor(final LogContext logContext, final BlockingQueue<T> eventQueue) {
|
||||
this.log = logContext.logger(EventProcessor.class);
|
||||
this.eventQueue = eventQueue;
|
||||
this.closer = new IdempotentCloser();
|
||||
}
|
||||
|
||||
public abstract boolean process();
|
||||
|
||||
protected abstract void process(T event);
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closer.close(this::closeInternal, () -> log.warn("The event processor was already closed"));
|
||||
}
|
||||
|
||||
protected interface ProcessHandler<T> {
|
||||
|
||||
void onProcess(T event, Optional<KafkaException> error);
|
||||
}
|
||||
public interface EventProcessor<T> {
|
||||
|
||||
/**
|
||||
* Drains all available events from the queue, and then processes them in order. If any errors are thrown while
|
||||
* processing the individual events, these are submitted to the given {@link ProcessHandler}.
|
||||
* Process an event that is received.
|
||||
*/
|
||||
protected boolean process(ProcessHandler<T> processHandler) {
|
||||
closer.assertOpen("The processor was previously closed, so no further processing can occur");
|
||||
|
||||
List<T> events = drain();
|
||||
|
||||
if (events.isEmpty())
|
||||
return false;
|
||||
|
||||
for (T event : events) {
|
||||
try {
|
||||
Objects.requireNonNull(event, "Attempted to process a null event");
|
||||
process(event);
|
||||
processHandler.onProcess(event, Optional.empty());
|
||||
} catch (Throwable t) {
|
||||
KafkaException error = ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
processHandler.onProcess(event, Optional.of(error));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* It is possible for the consumer to close before complete processing all the events in the queue. In
|
||||
* this case, we need to throw an exception to notify the user the consumer is closed.
|
||||
*/
|
||||
private void closeInternal() {
|
||||
log.trace("Closing event processor");
|
||||
List<T> incompleteEvents = drain();
|
||||
|
||||
if (incompleteEvents.isEmpty())
|
||||
return;
|
||||
|
||||
KafkaException exception = new KafkaException("The consumer is closed");
|
||||
|
||||
// Check each of the events and if it has a Future that is incomplete, complete it exceptionally.
|
||||
incompleteEvents
|
||||
.stream()
|
||||
.filter(e -> e instanceof CompletableEvent)
|
||||
.map(e -> ((CompletableEvent<?>) e).future())
|
||||
.filter(f -> !f.isDone())
|
||||
.forEach(f -> {
|
||||
log.debug("Completing {} with exception {}", f, exception.getMessage());
|
||||
f.completeExceptionally(exception);
|
||||
});
|
||||
|
||||
log.debug("Discarding {} events because the consumer is closing", incompleteEvents.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves all the events from the queue to the returned list.
|
||||
*/
|
||||
private List<T> drain() {
|
||||
LinkedList<T> events = new LinkedList<>();
|
||||
eventQueue.drainTo(events);
|
||||
return events;
|
||||
}
|
||||
void process(T event);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals.events;
|
|||
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
@ -31,8 +30,8 @@ public class FetchCommittedOffsetsEvent extends CompletableApplicationEvent<Map<
|
|||
*/
|
||||
private final Set<TopicPartition> partitions;
|
||||
|
||||
public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions, final Timer timer) {
|
||||
super(Type.FETCH_COMMITTED_OFFSETS, timer);
|
||||
public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions, final long deadlineMs) {
|
||||
super(Type.FETCH_COMMITTED_OFFSETS, deadlineMs);
|
||||
this.partitions = Collections.unmodifiableSet(partitions);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,11 +16,9 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
public class LeaveOnCloseEvent extends CompletableApplicationEvent<Void> {
|
||||
|
||||
public LeaveOnCloseEvent(final Timer timer) {
|
||||
super(Type.LEAVE_ON_CLOSE, timer);
|
||||
public LeaveOnCloseEvent(final long deadlineMs) {
|
||||
super(Type.LEAVE_ON_CLOSE, deadlineMs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals.events;
|
|||
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
|
||||
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -38,9 +37,9 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
|
|||
private final boolean requireTimestamps;
|
||||
|
||||
public ListOffsetsEvent(Map<TopicPartition, Long> timestampToSearch,
|
||||
Timer timer,
|
||||
long deadlineMs,
|
||||
boolean requireTimestamps) {
|
||||
super(Type.LIST_OFFSETS, timer);
|
||||
super(Type.LIST_OFFSETS, deadlineMs);
|
||||
this.timestampsToSearch = Collections.unmodifiableMap(timestampToSearch);
|
||||
this.requireTimestamps = requireTimestamps;
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
/**
|
||||
* Event for resetting offsets for all assigned partitions that require it. This is an
|
||||
* asynchronous event that generates ListOffsets requests, and completes by updating in-memory
|
||||
|
@ -26,7 +24,7 @@ import org.apache.kafka.common.utils.Timer;
|
|||
*/
|
||||
public class ResetPositionsEvent extends CompletableApplicationEvent<Void> {
|
||||
|
||||
public ResetPositionsEvent(final Timer timer) {
|
||||
super(Type.RESET_POSITIONS, timer);
|
||||
public ResetPositionsEvent(final long deadlineMs) {
|
||||
super(Type.RESET_POSITIONS, deadlineMs);
|
||||
}
|
||||
}
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals.events;
|
|||
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -28,7 +27,7 @@ import java.util.Map;
|
|||
*/
|
||||
public class SyncCommitEvent extends CommitEvent {
|
||||
|
||||
public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final Timer timer) {
|
||||
super(Type.COMMIT_SYNC, offsets, timer);
|
||||
public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) {
|
||||
super(Type.COMMIT_SYNC, offsets, deadlineMs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,16 +16,14 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class TopicMetadataEvent extends AbstractTopicMetadataEvent {
|
||||
|
||||
private final String topic;
|
||||
|
||||
public TopicMetadataEvent(final String topic, final Timer timer) {
|
||||
super(Type.TOPIC_METADATA, timer);
|
||||
public TopicMetadataEvent(final String topic, final long deadlineMs) {
|
||||
super(Type.TOPIC_METADATA, deadlineMs);
|
||||
this.topic = Objects.requireNonNull(topic);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
/**
|
||||
* Application event triggered when a user calls the unsubscribe API. This will make the consumer
|
||||
* release all its assignments and send a heartbeat request to leave the consumer group.
|
||||
|
@ -28,8 +26,8 @@ import org.apache.kafka.common.utils.Timer;
|
|||
*/
|
||||
public class UnsubscribeEvent extends CompletableApplicationEvent<Void> {
|
||||
|
||||
public UnsubscribeEvent(final Timer timer) {
|
||||
super(Type.UNSUBSCRIBE, timer);
|
||||
public UnsubscribeEvent(final long deadlineMs) {
|
||||
super(Type.UNSUBSCRIBE, deadlineMs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
/**
|
||||
* Event for validating offsets for all assigned partitions for which a leader change has been
|
||||
* detected. This is an asynchronous event that generates OffsetForLeaderEpoch requests, and
|
||||
|
@ -26,7 +24,7 @@ import org.apache.kafka.common.utils.Timer;
|
|||
*/
|
||||
public class ValidatePositionsEvent extends CompletableApplicationEvent<Void> {
|
||||
|
||||
public ValidatePositionsEvent(final Timer timer) {
|
||||
super(Type.VALIDATE_POSITIONS, timer);
|
||||
public ValidatePositionsEvent(final long deadlineMs) {
|
||||
super(Type.VALIDATE_POSITIONS, deadlineMs);
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
|
||||
|
@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest {
|
|||
private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class);
|
||||
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
|
||||
private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
|
||||
private final CompletableEventReaper backgroundEventReaper = mock(CompletableEventReaper.class);
|
||||
|
||||
@AfterEach
|
||||
public void resetAll() {
|
||||
|
@ -190,7 +192,8 @@ public class AsyncKafkaConsumerTest {
|
|||
new StringDeserializer(),
|
||||
new StringDeserializer(),
|
||||
time,
|
||||
(a, b, c, d, e, f) -> applicationEventHandler,
|
||||
(a, b, c, d, e, f, g) -> applicationEventHandler,
|
||||
a -> backgroundEventReaper,
|
||||
(a, b, c, d, e, f, g) -> fetchCollector,
|
||||
(a, b, c, d) -> metadata,
|
||||
backgroundEventQueue
|
||||
|
@ -218,6 +221,7 @@ public class AsyncKafkaConsumerTest {
|
|||
time,
|
||||
applicationEventHandler,
|
||||
backgroundEventQueue,
|
||||
backgroundEventReaper,
|
||||
rebalanceListenerInvoker,
|
||||
new Metrics(),
|
||||
subscriptions,
|
||||
|
@ -318,6 +322,7 @@ public class AsyncKafkaConsumerTest {
|
|||
@Test
|
||||
public void testCommitAsyncWithFencedException() {
|
||||
consumer = newConsumer();
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
final Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
|
||||
MockCommitCallback callback = new MockCommitCallback();
|
||||
|
||||
|
@ -339,7 +344,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
|
||||
|
||||
assertEquals(topicPartitionOffsets, consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), any());
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
|
||||
final Metric metric = consumer.metrics()
|
||||
.get(consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"));
|
||||
assertTrue((double) metric.metricValue() > 0);
|
||||
|
@ -361,7 +366,7 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
|
||||
verify(metadata).updateLastSeenEpochIfNewer(t2, 3);
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), any());
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -369,7 +374,7 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer = newConsumer();
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
|
||||
when(applicationEventHandler.addAndGet(
|
||||
any(FetchCommittedOffsetsEvent.class), any())).thenAnswer(invocation -> {
|
||||
any(FetchCommittedOffsetsEvent.class))).thenAnswer(invocation -> {
|
||||
CompletableApplicationEvent<?> event = invocation.getArgument(0);
|
||||
assertInstanceOf(FetchCommittedOffsetsEvent.class, event);
|
||||
throw new KafkaException("Test exception");
|
||||
|
@ -387,6 +392,7 @@ public class AsyncKafkaConsumerTest {
|
|||
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
|
@ -408,6 +414,7 @@ public class AsyncKafkaConsumerTest {
|
|||
}).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
|
@ -431,6 +438,7 @@ public class AsyncKafkaConsumerTest {
|
|||
}).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
|
@ -486,6 +494,7 @@ public class AsyncKafkaConsumerTest {
|
|||
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
|
@ -558,6 +567,7 @@ public class AsyncKafkaConsumerTest {
|
|||
singletonList(new RoundRobinAssignor()),
|
||||
"group-id",
|
||||
"client-id");
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
final TopicPartition t0 = new TopicPartition("t0", 2);
|
||||
final TopicPartition t1 = new TopicPartition("t0", 3);
|
||||
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
|
||||
|
@ -759,9 +769,9 @@ public class AsyncKafkaConsumerTest {
|
|||
@Test
|
||||
public void testVerifyApplicationEventOnShutdown() {
|
||||
consumer = newConsumer();
|
||||
doReturn(null).when(applicationEventHandler).addAndGet(any(), any());
|
||||
doReturn(null).when(applicationEventHandler).addAndGet(any());
|
||||
consumer.close();
|
||||
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class), any());
|
||||
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class));
|
||||
verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
|
||||
}
|
||||
|
||||
|
@ -804,7 +814,7 @@ public class AsyncKafkaConsumerTest {
|
|||
subscriptions.assignFromSubscribed(singleton(tp));
|
||||
doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp)));
|
||||
assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
|
||||
verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseEvent.class), any());
|
||||
verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseEvent.class));
|
||||
verify(listener).onPartitionsRevoked(eq(singleton(tp)));
|
||||
assertEquals(emptySet(), subscriptions.assignedPartitions());
|
||||
}
|
||||
|
@ -827,6 +837,7 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
@Test
|
||||
public void testAutoCommitSyncEnabled() {
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
|
||||
consumer = newConsumer(
|
||||
mock(FetchBuffer.class),
|
||||
|
@ -839,7 +850,7 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
|
||||
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
|
||||
subscriptions.seek(new TopicPartition("topic", 0), 100);
|
||||
consumer.maybeAutoCommitSync(true, time.timer(100));
|
||||
consumer.autoCommitSync(time.timer(100));
|
||||
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
|
||||
}
|
||||
|
||||
|
@ -857,7 +868,6 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
|
||||
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
|
||||
subscriptions.seek(new TopicPartition("topic", 0), 100);
|
||||
consumer.maybeAutoCommitSync(false, time.timer(100));
|
||||
verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class));
|
||||
}
|
||||
|
||||
|
@ -936,8 +946,9 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer = newConsumer();
|
||||
Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = mockOffsetAndTimestamp();
|
||||
|
||||
when(applicationEventHandler.addAndGet(any(ListOffsetsEvent.class), any())).thenAnswer(invocation -> {
|
||||
Timer timer = invocation.getArgument(1);
|
||||
when(applicationEventHandler.addAndGet(any(ListOffsetsEvent.class))).thenAnswer(invocation -> {
|
||||
ListOffsetsEvent event = invocation.getArgument(0);
|
||||
Timer timer = time.timer(event.deadlineMs() - time.milliseconds());
|
||||
if (timer.remainingMs() == 0) {
|
||||
fail("Timer duration should not be zero.");
|
||||
}
|
||||
|
@ -950,7 +961,7 @@ public class AsyncKafkaConsumerTest {
|
|||
assertTrue(result.containsKey(key));
|
||||
assertEquals(value.offset(), result.get(key));
|
||||
});
|
||||
verify(applicationEventHandler).addAndGet(any(ListOffsetsEvent.class), any(Timer.class));
|
||||
verify(applicationEventHandler).addAndGet(any(ListOffsetsEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -960,26 +971,23 @@ public class AsyncKafkaConsumerTest {
|
|||
Throwable eventProcessingFailure = new KafkaException("Unexpected failure " +
|
||||
"processing List Offsets event");
|
||||
doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet(
|
||||
any(ListOffsetsEvent.class),
|
||||
any());
|
||||
any(ListOffsetsEvent.class));
|
||||
Throwable consumerError = assertThrows(KafkaException.class,
|
||||
() -> consumer.beginningOffsets(partitions,
|
||||
Duration.ofMillis(1)));
|
||||
assertEquals(eventProcessingFailure, consumerError);
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
|
||||
ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() {
|
||||
consumer = newConsumer();
|
||||
doThrow(new TimeoutException()).when(applicationEventHandler).addAndGet(any(), any());
|
||||
doThrow(new TimeoutException()).when(applicationEventHandler).addAndGet(any());
|
||||
assertThrows(TimeoutException.class,
|
||||
() -> consumer.beginningOffsets(
|
||||
Collections.singletonList(new TopicPartition("t1", 0)),
|
||||
Duration.ofMillis(1)));
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
|
||||
ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1014,15 +1022,14 @@ public class AsyncKafkaConsumerTest {
|
|||
Map<TopicPartition, OffsetAndTimestampInternal> expectedResult = mockOffsetAndTimestamp();
|
||||
Map<TopicPartition, Long> timestampToSearch = mockTimestampToSearch();
|
||||
|
||||
doReturn(expectedResult).when(applicationEventHandler).addAndGet(any(), any());
|
||||
doReturn(expectedResult).when(applicationEventHandler).addAndGet(any());
|
||||
Map<TopicPartition, OffsetAndTimestamp> result =
|
||||
assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1)));
|
||||
expectedResult.forEach((key, value) -> {
|
||||
OffsetAndTimestamp expected = value.buildOffsetAndTimestamp();
|
||||
assertEquals(expected, result.get(key));
|
||||
});
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
|
||||
ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class));
|
||||
}
|
||||
|
||||
// This test ensures same behaviour as the current consumer when offsetsForTimes is called
|
||||
|
@ -1049,8 +1056,7 @@ public class AsyncKafkaConsumerTest {
|
|||
Map<TopicPartition, OffsetAndTimestamp> result =
|
||||
assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, Duration.ZERO));
|
||||
assertEquals(expectedResult, result);
|
||||
verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
|
||||
ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1059,13 +1065,12 @@ public class AsyncKafkaConsumerTest {
|
|||
final Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
|
||||
doAnswer(invocation -> {
|
||||
CompletableApplicationEvent<?> event = invocation.getArgument(0);
|
||||
Timer timer = invocation.getArgument(1);
|
||||
assertInstanceOf(FetchCommittedOffsetsEvent.class, event);
|
||||
assertTrue(event.future().isCompletedExceptionally());
|
||||
return ConsumerUtils.getResult(event.future(), timer);
|
||||
return ConsumerUtils.getResult(event.future());
|
||||
})
|
||||
.when(applicationEventHandler)
|
||||
.addAndGet(any(FetchCommittedOffsetsEvent.class), any(Timer.class));
|
||||
.addAndGet(any(FetchCommittedOffsetsEvent.class));
|
||||
|
||||
consumer.wakeup();
|
||||
assertThrows(WakeupException.class, () -> consumer.committed(offsets.keySet()));
|
||||
|
@ -1216,6 +1221,7 @@ public class AsyncKafkaConsumerTest {
|
|||
@Test
|
||||
public void testRefreshCommittedOffsetsSuccess() {
|
||||
consumer = newConsumer();
|
||||
completeCommitSyncApplicationEventSuccessfully();
|
||||
TopicPartition partition = new TopicPartition("t1", 1);
|
||||
Set<TopicPartition> partitions = Collections.singleton(partition);
|
||||
Map<TopicPartition, OffsetAndMetadata> committedOffsets = Collections.singletonMap(partition, new OffsetAndMetadata(10L));
|
||||
|
@ -1661,20 +1667,20 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.poll(Duration.ZERO);
|
||||
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class));
|
||||
|
||||
if (committedOffsetsEnabled) {
|
||||
// Verify there was an FetchCommittedOffsets event and no ResetPositions event
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
|
||||
verify(applicationEventHandler, never())
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class));
|
||||
} else {
|
||||
// Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions
|
||||
verify(applicationEventHandler, never())
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1689,11 +1695,11 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.poll(Duration.ZERO);
|
||||
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1730,7 +1736,7 @@ public class AsyncKafkaConsumerTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents}
|
||||
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) processBackgroundEvents}
|
||||
* handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
|
||||
*/
|
||||
@Test
|
||||
|
@ -1756,16 +1762,14 @@ public class AsyncKafkaConsumerTest {
|
|||
return null;
|
||||
}).when(future).get(any(Long.class), any(TimeUnit.class));
|
||||
|
||||
try (EventProcessor<?> processor = mock(EventProcessor.class)) {
|
||||
consumer.processBackgroundEvents(processor, future, timer);
|
||||
consumer.processBackgroundEvents(future, timer);
|
||||
|
||||
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
|
||||
assertEquals(800, timer.remainingMs());
|
||||
}
|
||||
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
|
||||
assertEquals(800, timer.remainingMs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents}
|
||||
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) processBackgroundEvents}
|
||||
* handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
|
||||
*/
|
||||
@Test
|
||||
|
@ -1776,17 +1780,15 @@ public class AsyncKafkaConsumerTest {
|
|||
// Create a future that is already completed.
|
||||
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
|
||||
|
||||
try (EventProcessor<?> processor = mock(EventProcessor.class)) {
|
||||
consumer.processBackgroundEvents(processor, future, timer);
|
||||
consumer.processBackgroundEvents(future, timer);
|
||||
|
||||
// Because we didn't need to perform a timed get, we should still have every last millisecond
|
||||
// of our initial timeout.
|
||||
assertEquals(1000, timer.remainingMs());
|
||||
}
|
||||
// Because we didn't need to perform a timed get, we should still have every last millisecond
|
||||
// of our initial timeout.
|
||||
assertEquals(1000, timer.remainingMs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents}
|
||||
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) processBackgroundEvents}
|
||||
* handles the case where the {@link Future} does not complete within the timeout.
|
||||
*/
|
||||
@Test
|
||||
|
@ -1801,12 +1803,10 @@ public class AsyncKafkaConsumerTest {
|
|||
throw new java.util.concurrent.TimeoutException("Intentional timeout");
|
||||
}).when(future).get(any(Long.class), any(TimeUnit.class));
|
||||
|
||||
try (EventProcessor<?> processor = mock(EventProcessor.class)) {
|
||||
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(processor, future, timer));
|
||||
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer));
|
||||
|
||||
// Because we forced our mocked future to continuously time out, we should have no time remaining.
|
||||
assertEquals(0, timer.remainingMs());
|
||||
}
|
||||
// Because we forced our mocked future to continuously time out, we should have no time remaining.
|
||||
assertEquals(0, timer.remainingMs());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1836,6 +1836,30 @@ public class AsyncKafkaConsumerTest {
|
|||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReaperInvokedInClose() {
|
||||
consumer = newConsumer();
|
||||
consumer.close();
|
||||
verify(backgroundEventReaper).reap(backgroundEventQueue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReaperInvokedInUnsubscribe() {
|
||||
consumer = newConsumer();
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
consumer.unsubscribe();
|
||||
verify(backgroundEventReaper).reap(time.milliseconds());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReaperInvokedInPoll() {
|
||||
consumer = newConsumer();
|
||||
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
consumer.subscribe(Collections.singletonList("topic"));
|
||||
consumer.poll(Duration.ZERO);
|
||||
verify(backgroundEventReaper).reap(time.milliseconds());
|
||||
}
|
||||
|
||||
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
|
||||
final TopicPartition t0 = new TopicPartition("t0", 2);
|
||||
final TopicPartition t1 = new TopicPartition("t0", 3);
|
||||
|
@ -1898,13 +1922,13 @@ public class AsyncKafkaConsumerTest {
|
|||
private void completeFetchedCommittedOffsetApplicationEventSuccessfully(final Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
|
||||
doReturn(committedOffsets)
|
||||
.when(applicationEventHandler)
|
||||
.addAndGet(any(FetchCommittedOffsetsEvent.class), any(Timer.class));
|
||||
.addAndGet(any(FetchCommittedOffsetsEvent.class));
|
||||
}
|
||||
|
||||
private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) {
|
||||
doThrow(ex)
|
||||
.when(applicationEventHandler)
|
||||
.addAndGet(any(FetchCommittedOffsetsEvent.class), any(Timer.class));
|
||||
.addAndGet(any(FetchCommittedOffsetsEvent.class));
|
||||
}
|
||||
|
||||
private void completeUnsubscribeApplicationEventSuccessfully() {
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProces
|
|||
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.message.FindCoordinatorRequestData;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.FindCoordinatorRequest;
|
||||
|
@ -41,7 +44,6 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
|
|||
import org.apache.kafka.common.requests.OffsetCommitResponse;
|
||||
import org.apache.kafka.common.requests.RequestTestUtils;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
import org.apache.kafka.test.TestCondition;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
|
@ -55,6 +57,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -62,6 +65,8 @@ import java.util.concurrent.CompletableFuture;
|
|||
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
|
||||
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
|
||||
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -70,7 +75,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -78,7 +85,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
public class ConsumerNetworkThreadTest {
|
||||
|
||||
private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder;
|
||||
private ConsumerTestBuilder testBuilder;
|
||||
private Time time;
|
||||
private ConsumerMetadata metadata;
|
||||
private NetworkClientDelegate networkClient;
|
||||
|
@ -88,11 +95,12 @@ public class ConsumerNetworkThreadTest {
|
|||
private CommitRequestManager commitRequestManager;
|
||||
private CoordinatorRequestManager coordinatorRequestManager;
|
||||
private ConsumerNetworkThread consumerNetworkThread;
|
||||
private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class);
|
||||
private MockClient client;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder();
|
||||
testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
|
||||
time = testBuilder.time;
|
||||
metadata = testBuilder.metadata;
|
||||
networkClient = testBuilder.networkClientDelegate;
|
||||
|
@ -102,14 +110,24 @@ public class ConsumerNetworkThreadTest {
|
|||
commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
|
||||
offsetsRequestManager = testBuilder.offsetsRequestManager;
|
||||
coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
|
||||
consumerNetworkThread = testBuilder.consumerNetworkThread;
|
||||
consumerNetworkThread = new ConsumerNetworkThread(
|
||||
testBuilder.logContext,
|
||||
time,
|
||||
testBuilder.applicationEventQueue,
|
||||
applicationEventReaper,
|
||||
() -> applicationEventProcessor,
|
||||
() -> testBuilder.networkClientDelegate,
|
||||
() -> testBuilder.requestManagers
|
||||
);
|
||||
consumerNetworkThread.initializeResources();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
if (testBuilder != null)
|
||||
if (testBuilder != null) {
|
||||
testBuilder.close();
|
||||
consumerNetworkThread.close(Duration.ZERO);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -157,8 +175,7 @@ public class ConsumerNetworkThreadTest {
|
|||
|
||||
@Test
|
||||
public void testSyncCommitEvent() {
|
||||
Timer timer = time.timer(100);
|
||||
ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), timer);
|
||||
ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100));
|
||||
applicationEventsQueue.add(e);
|
||||
consumerNetworkThread.runOnce();
|
||||
verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
|
||||
|
@ -168,8 +185,7 @@ public class ConsumerNetworkThreadTest {
|
|||
@ValueSource(booleans = {true, false})
|
||||
public void testListOffsetsEventIsProcessed(boolean requireTimestamp) {
|
||||
Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L);
|
||||
Timer timer = time.timer(100);
|
||||
ApplicationEvent e = new ListOffsetsEvent(timestamps, timer, requireTimestamp);
|
||||
ApplicationEvent e = new ListOffsetsEvent(timestamps, calculateDeadlineMs(time, 100), requireTimestamp);
|
||||
applicationEventsQueue.add(e);
|
||||
consumerNetworkThread.runOnce();
|
||||
verify(applicationEventProcessor).process(any(ListOffsetsEvent.class));
|
||||
|
@ -178,8 +194,7 @@ public class ConsumerNetworkThreadTest {
|
|||
|
||||
@Test
|
||||
public void testResetPositionsEventIsProcessed() {
|
||||
Timer timer = time.timer(100);
|
||||
ResetPositionsEvent e = new ResetPositionsEvent(timer);
|
||||
ResetPositionsEvent e = new ResetPositionsEvent(calculateDeadlineMs(time, 100));
|
||||
applicationEventsQueue.add(e);
|
||||
consumerNetworkThread.runOnce();
|
||||
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
|
||||
|
@ -190,8 +205,7 @@ public class ConsumerNetworkThreadTest {
|
|||
public void testResetPositionsProcessFailureIsIgnored() {
|
||||
doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
|
||||
|
||||
Timer timer = time.timer(100);
|
||||
ResetPositionsEvent event = new ResetPositionsEvent(timer);
|
||||
ResetPositionsEvent event = new ResetPositionsEvent(calculateDeadlineMs(time, 100));
|
||||
applicationEventsQueue.add(event);
|
||||
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
|
||||
|
||||
|
@ -200,8 +214,7 @@ public class ConsumerNetworkThreadTest {
|
|||
|
||||
@Test
|
||||
public void testValidatePositionsEventIsProcessed() {
|
||||
Timer timer = time.timer(100);
|
||||
ValidatePositionsEvent e = new ValidatePositionsEvent(timer);
|
||||
ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100));
|
||||
applicationEventsQueue.add(e);
|
||||
consumerNetworkThread.runOnce();
|
||||
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
|
||||
|
@ -226,8 +239,7 @@ public class ConsumerNetworkThreadTest {
|
|||
|
||||
@Test
|
||||
void testFetchTopicMetadata() {
|
||||
Timer timer = time.timer(Long.MAX_VALUE);
|
||||
applicationEventsQueue.add(new TopicMetadataEvent("topic", timer));
|
||||
applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE));
|
||||
consumerNetworkThread.runOnce();
|
||||
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
|
||||
}
|
||||
|
@ -282,6 +294,22 @@ public class ConsumerNetworkThreadTest {
|
|||
|
||||
@Test
|
||||
void testEnsureEventsAreCompleted() {
|
||||
// Mimic the logic of CompletableEventReaper.reap(Collection):
|
||||
doAnswer(__ -> {
|
||||
Iterator<ApplicationEvent> i = applicationEventsQueue.iterator();
|
||||
|
||||
while (i.hasNext()) {
|
||||
ApplicationEvent event = i.next();
|
||||
|
||||
if (event instanceof CompletableEvent)
|
||||
((CompletableEvent<?>) event).future().completeExceptionally(new TimeoutException());
|
||||
|
||||
i.remove();
|
||||
}
|
||||
|
||||
return null;
|
||||
}).when(applicationEventReaper).reap(any(Collection.class));
|
||||
|
||||
Node node = metadata.fetch().nodes().get(0);
|
||||
coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds());
|
||||
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node));
|
||||
|
@ -294,12 +322,23 @@ public class ConsumerNetworkThreadTest {
|
|||
applicationEventsQueue.add(event2);
|
||||
assertFalse(future.isDone());
|
||||
assertFalse(applicationEventsQueue.isEmpty());
|
||||
|
||||
consumerNetworkThread.cleanup();
|
||||
assertTrue(future.isCompletedExceptionally());
|
||||
assertTrue(applicationEventsQueue.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCleanupInvokesReaper() {
|
||||
consumerNetworkThread.cleanup();
|
||||
verify(applicationEventReaper).reap(applicationEventsQueue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunOnceInvokesReaper() {
|
||||
consumerNetworkThread.runOnce();
|
||||
verify(applicationEventReaper).reap(any(Long.class));
|
||||
}
|
||||
|
||||
private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets,
|
||||
final Errors error,
|
||||
final boolean disconnected) {
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
@ -270,7 +269,6 @@ public class ConsumerTestBuilder implements Closeable {
|
|||
);
|
||||
this.applicationEventProcessor = spy(new ApplicationEventProcessor(
|
||||
logContext,
|
||||
applicationEventQueue,
|
||||
requestManagers,
|
||||
metadata
|
||||
)
|
||||
|
@ -287,32 +285,6 @@ public class ConsumerTestBuilder implements Closeable {
|
|||
@Override
|
||||
public void close() {
|
||||
closeQuietly(requestManagers, RequestManagers.class.getSimpleName());
|
||||
closeQuietly(applicationEventProcessor, ApplicationEventProcessor.class.getSimpleName());
|
||||
}
|
||||
|
||||
public static class ConsumerNetworkThreadTestBuilder extends ConsumerTestBuilder {
|
||||
|
||||
final ConsumerNetworkThread consumerNetworkThread;
|
||||
|
||||
public ConsumerNetworkThreadTestBuilder() {
|
||||
this(createDefaultGroupInformation());
|
||||
}
|
||||
|
||||
public ConsumerNetworkThreadTestBuilder(Optional<GroupInformation> groupInfo) {
|
||||
super(groupInfo);
|
||||
this.consumerNetworkThread = new ConsumerNetworkThread(
|
||||
logContext,
|
||||
time,
|
||||
() -> applicationEventProcessor,
|
||||
() -> networkClientDelegate,
|
||||
() -> requestManagers
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
consumerNetworkThread.close(Duration.ZERO);
|
||||
}
|
||||
}
|
||||
|
||||
public static class GroupInformation {
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
|
@ -39,6 +38,7 @@ import java.util.Optional;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -77,7 +77,6 @@ public class ApplicationEventProcessorTest {
|
|||
);
|
||||
processor = new ApplicationEventProcessor(
|
||||
new LogContext(),
|
||||
applicationEventQueue,
|
||||
requestManagers,
|
||||
metadata
|
||||
);
|
||||
|
@ -93,8 +92,7 @@ public class ApplicationEventProcessorTest {
|
|||
|
||||
@Test
|
||||
public void testPrepClosingLeaveGroupEvent() {
|
||||
Timer timer = time.timer(100);
|
||||
LeaveOnCloseEvent event = new LeaveOnCloseEvent(timer);
|
||||
LeaveOnCloseEvent event = new LeaveOnCloseEvent(calculateDeadlineMs(time, 100));
|
||||
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
|
||||
when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
|
||||
processor.process(event);
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class CompletableEventReaperTest {
|
||||
|
||||
private final LogContext logContext = new LogContext();
|
||||
private final Time time = new MockTime();
|
||||
private final CompletableEventReaper reaper = new CompletableEventReaper(logContext);
|
||||
|
||||
@Test
|
||||
public void testExpired() {
|
||||
// Add a new event to the reaper.
|
||||
long timeoutMs = 100;
|
||||
UnsubscribeEvent event = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
reaper.add(event);
|
||||
|
||||
// Without any time passing, we check the reaper and verify that the event is not done amd is still
|
||||
// being tracked.
|
||||
reaper.reap(time.milliseconds());
|
||||
assertFalse(event.future().isDone());
|
||||
assertEquals(1, reaper.size());
|
||||
|
||||
// Sleep for at least 1 ms. *more* than the timeout so that the event is considered expired.
|
||||
time.sleep(timeoutMs + 1);
|
||||
|
||||
// However, until we actually invoke the reaper, the event isn't complete and is still being tracked.
|
||||
assertFalse(event.future().isDone());
|
||||
assertEquals(1, reaper.size());
|
||||
|
||||
// Call the reaper and validate that the event is now "done" (expired), the correct exception type is
|
||||
// thrown, and the event is no longer tracked.
|
||||
reaper.reap(time.milliseconds());
|
||||
assertTrue(event.future().isDone());
|
||||
assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event.future()));
|
||||
assertEquals(0, reaper.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompleted() {
|
||||
// Add a new event to the reaper.
|
||||
long timeoutMs = 100;
|
||||
UnsubscribeEvent event = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
reaper.add(event);
|
||||
|
||||
// Without any time passing, we check the reaper and verify that the event is not done amd is still
|
||||
// being tracked.
|
||||
reaper.reap(time.milliseconds());
|
||||
assertFalse(event.future().isDone());
|
||||
assertEquals(1, reaper.size());
|
||||
|
||||
// We'll cause the event to be completed normally. Note that because we haven't called the reaper, the
|
||||
// event is still being tracked.
|
||||
event.future().complete(null);
|
||||
assertTrue(event.future().isDone());
|
||||
assertEquals(1, reaper.size());
|
||||
|
||||
// To ensure we don't accidentally expire an event that completed normally, sleep past the timeout.
|
||||
time.sleep(timeoutMs + 1);
|
||||
|
||||
// Call the reaper and validate that the event is not considered expired, but is still no longer tracked.
|
||||
reaper.reap(time.milliseconds());
|
||||
assertTrue(event.future().isDone());
|
||||
assertNull(ConsumerUtils.getResult(event.future()));
|
||||
assertEquals(0, reaper.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompletedAndExpired() {
|
||||
// Add two events to the reaper. One event will be completed, the other we will let expire.
|
||||
long timeoutMs = 100;
|
||||
UnsubscribeEvent event1 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
UnsubscribeEvent event2 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
reaper.add(event1);
|
||||
reaper.add(event2);
|
||||
|
||||
// Without any time passing, we check the reaper and verify that the event is not done amd is still
|
||||
// being tracked.
|
||||
reaper.reap(time.milliseconds());
|
||||
assertFalse(event1.future().isDone());
|
||||
assertFalse(event2.future().isDone());
|
||||
assertEquals(2, reaper.size());
|
||||
|
||||
// We'll cause the first event to be completed normally, but then sleep past the timer deadline.
|
||||
event1.future().complete(null);
|
||||
assertTrue(event1.future().isDone());
|
||||
|
||||
time.sleep(timeoutMs + 1);
|
||||
|
||||
// Though the first event is completed, it's still being tracked, along with the second expired event.
|
||||
assertEquals(2, reaper.size());
|
||||
|
||||
// Validate that the first (completed) event is not expired, but the second one is expired. In either case,
|
||||
// both should be completed and neither should be tracked anymore.
|
||||
reaper.reap(time.milliseconds());
|
||||
assertTrue(event1.future().isDone());
|
||||
assertTrue(event2.future().isDone());
|
||||
assertNull(ConsumerUtils.getResult(event1.future()));
|
||||
assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future()));
|
||||
assertEquals(0, reaper.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncompleteQueue() {
|
||||
long timeoutMs = 100;
|
||||
UnsubscribeEvent event1 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
UnsubscribeEvent event2 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
|
||||
Collection<CompletableApplicationEvent<?>> queue = new ArrayList<>();
|
||||
queue.add(event1);
|
||||
queue.add(event2);
|
||||
|
||||
// Complete one of our events, just to make sure it isn't inadvertently canceled.
|
||||
event1.future().complete(null);
|
||||
|
||||
// In this test, our events aren't tracked in the reaper, just in the queue.
|
||||
assertEquals(0, reaper.size());
|
||||
assertEquals(2, queue.size());
|
||||
|
||||
// Go ahead and reap the incomplete from the queue.
|
||||
reaper.reap(queue);
|
||||
|
||||
// The first event was completed, so we didn't expire it in the reaper.
|
||||
assertTrue(event1.future().isDone());
|
||||
assertFalse(event1.future().isCancelled());
|
||||
assertNull(ConsumerUtils.getResult(event1.future()));
|
||||
|
||||
// The second event was incomplete, so it was expired.
|
||||
assertTrue(event2.future().isCompletedExceptionally());
|
||||
assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future()));
|
||||
|
||||
// Because the events aren't tracked in the reaper *and* the queue is cleared as part of the
|
||||
// cancellation process, our data structures should both be the same as above.
|
||||
assertEquals(0, reaper.size());
|
||||
assertEquals(0, queue.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncompleteTracked() {
|
||||
// This queue is just here to test the case where the queue is empty.
|
||||
Collection<CompletableApplicationEvent<?>> queue = new ArrayList<>();
|
||||
|
||||
// Add two events for the reaper to track.
|
||||
long timeoutMs = 100;
|
||||
UnsubscribeEvent event1 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
UnsubscribeEvent event2 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs));
|
||||
reaper.add(event1);
|
||||
reaper.add(event2);
|
||||
|
||||
// Complete one of our events, just to make sure it isn't inadvertently canceled.
|
||||
event1.future().complete(null);
|
||||
|
||||
// In this test, our events are tracked exclusively in the reaper, not the queue.
|
||||
assertEquals(2, reaper.size());
|
||||
|
||||
// Go ahead and reap the incomplete events. Both sets should be zero after that.
|
||||
reaper.reap(queue);
|
||||
assertEquals(0, reaper.size());
|
||||
assertEquals(0, queue.size());
|
||||
|
||||
// The first event was completed, so we didn't cancel it in the reaper.
|
||||
assertTrue(event1.future().isDone());
|
||||
assertNull(ConsumerUtils.getResult(event1.future()));
|
||||
|
||||
// The second event was incomplete, so it was canceled.
|
||||
assertTrue(event2.future().isCompletedExceptionally());
|
||||
assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future()));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue