mirror of https://github.com/apache/kafka.git
Updates to fix inverted logic in maybeInterruptCompositePoll()
This commit is contained in:
parent
d3fa910d10
commit
dbc4773a34
|
@ -249,7 +249,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
// If there are enqueued callbacks to invoke, exit to the application thread.
|
||||
RequiresApplicationThreadExecution test = () -> offsetCommitCallbackInvoker.isPresent() && offsetCommitCallbackInvoker.get().size() > 0;
|
||||
|
||||
if (maybePauseCompositePoll(test, event.future(), CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED))
|
||||
if (maybeInterruptCompositePoll(test, event.future(), CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED))
|
||||
return;
|
||||
|
||||
nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA;
|
||||
|
@ -265,7 +265,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
// If there are background events to process, exit to the application thread.
|
||||
RequiresApplicationThreadExecution test = () -> backgroundEventHandler.size() > 0;
|
||||
|
||||
if (maybePauseCompositePoll(test, event.future(), CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED))
|
||||
if (maybeInterruptCompositePoll(test, event.future(), CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED))
|
||||
return;
|
||||
|
||||
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;
|
||||
|
@ -305,10 +305,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
event.future().completeExceptionally(new IllegalArgumentException("Unknown next step for composite poll: " + nextEventType));
|
||||
}
|
||||
|
||||
private boolean maybePauseCompositePoll(RequiresApplicationThreadExecution test,
|
||||
CompletableFuture<CompositePollEvent.State> future,
|
||||
CompositePollEvent.State state) {
|
||||
if (test.requiresApplicationThread())
|
||||
private boolean maybeInterruptCompositePoll(RequiresApplicationThreadExecution test,
|
||||
CompletableFuture<CompositePollEvent.State> future,
|
||||
CompositePollEvent.State state) {
|
||||
if (!test.requiresApplicationThread())
|
||||
return false;
|
||||
|
||||
log.debug("******** TEMP DEBUG ******** Pausing composite poll at state {}", state);
|
||||
|
|
|
@ -109,6 +109,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -509,7 +510,13 @@ public class AsyncKafkaConsumerTest {
|
|||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(Collections.singletonList(topicName), listener);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
|
||||
markResultForCompositePollEvent(
|
||||
List.of(
|
||||
CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED,
|
||||
CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED,
|
||||
CompositePollEvent.State.COMPLETE
|
||||
)
|
||||
);
|
||||
consumer.poll(Duration.ZERO);
|
||||
assertTrue(callbackExecuted.get());
|
||||
}
|
||||
|
@ -535,6 +542,7 @@ public class AsyncKafkaConsumerTest {
|
|||
markResultForCompositePollEvent(CompositePollEvent.State.COMPLETE);
|
||||
consumer.poll(Duration.ZERO);
|
||||
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.COMPLETE);
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
|
||||
|
@ -679,7 +687,13 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
|
||||
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED);
|
||||
markResultForCompositePollEvent(
|
||||
List.of(
|
||||
CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED,
|
||||
CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED,
|
||||
CompositePollEvent.State.COMPLETE
|
||||
)
|
||||
);
|
||||
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback);
|
||||
}
|
||||
|
||||
|
@ -1480,7 +1494,12 @@ public class AsyncKafkaConsumerTest {
|
|||
}
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
|
||||
markResultForCompositePollEvent(
|
||||
List.of(
|
||||
CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED,
|
||||
CompositePollEvent.State.COMPLETE
|
||||
)
|
||||
);
|
||||
|
||||
// This will trigger the background event queue to process our background event message.
|
||||
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
|
||||
|
@ -1848,7 +1867,12 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.subscribe(Collections.singletonList("topic"));
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.COMPLETE);
|
||||
markResultForCompositePollEvent(
|
||||
List.of(
|
||||
CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED,
|
||||
CompositePollEvent.State.COMPLETE
|
||||
)
|
||||
);
|
||||
consumer.poll(Duration.ZERO);
|
||||
verify(backgroundEventReaper).reap(time.milliseconds());
|
||||
}
|
||||
|
@ -2229,10 +2253,33 @@ public class AsyncKafkaConsumerTest {
|
|||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
|
||||
}
|
||||
|
||||
private void markResultForCompositePollEvent(CompositePollEvent.State result) {
|
||||
private void markResultForCompositePollEvent(CompositePollEvent.State state) {
|
||||
doAnswer(invocation -> {
|
||||
CompositePollEvent event = invocation.getArgument(0);
|
||||
event.future().complete(result);
|
||||
|
||||
if (Thread.currentThread().isInterrupted())
|
||||
event.future().completeExceptionally(new InterruptException("Test interrupt"));
|
||||
else
|
||||
event.future().complete(state);
|
||||
return null;
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
|
||||
}
|
||||
|
||||
private void markResultForCompositePollEvent(Collection<CompositePollEvent.State> states) {
|
||||
LinkedList<CompositePollEvent.State> statesQueue = new LinkedList<>(states);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
CompositePollEvent.State state = statesQueue.poll();
|
||||
|
||||
if (state == null)
|
||||
throw new IllegalStateException("The array of " + CompositePollEvent.State.class.getSimpleName() + " did not provide enough values");
|
||||
|
||||
CompositePollEvent event = invocation.getArgument(0);
|
||||
|
||||
if (Thread.currentThread().isInterrupted())
|
||||
event.future().completeExceptionally(new InterruptException("Test interrupt"));
|
||||
else
|
||||
event.future().complete(state);
|
||||
return null;
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue