diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e7560d92c14..c1aacca4ce4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -202,58 +202,41 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private class CompositePollEventInvoker { private CompositePollEvent latest; - private int backoff = -1; private void poll(Timer timer) { if (latest == null) { submitEvent(ApplicationEvent.Type.POLL, timer); } - log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs()); - - CompositePollEvent.Result result; - try { - result = latest.resultOrError(); + log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs()); + + CompositePollEvent.Result result = latest.resultOrError(); + CompositePollEvent.State state = result.state(); + + if (state == CompositePollEvent.State.COMPLETE) { + // Make sure to clear out the latest request since it's complete. + latest = null; + } else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) { + processBackgroundEvents(); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); + } else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) { + offsetCommitCallbackInvoker.executeCallbacks(); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); + } else if (state == CompositePollEvent.State.UNKNOWN) { + throw new KafkaException("Unexpected poll result received"); + } } catch (Throwable t) { - // If the background thread hit an exception, bubble it up to the user but make sure to clear - // out the latest request to signify this one is complete. + // If an exception is hit, bubble it up to the user but make sure to clear out the latest request + // to signify this one is complete. latest = null; throw ConsumerUtils.maybeWrapAsKafkaException(t); } - - CompositePollEvent.State state = result.state(); - - if (state == CompositePollEvent.State.COMPLETE) { - // Make sure to clear out the latest request since it's complete. - latest = null; - - if (fetchBuffer.isEmpty()) - submitEvent(ApplicationEvent.Type.POLL, timer); - } else if (state == CompositePollEvent.State.UNKNOWN) { - latest = null; - throw new KafkaException("Unexpected poll result received"); - } else if (state == CompositePollEvent.State.INCOMPLETE) { - if (backoff == -1) - backoff = 1; - else - backoff *= 2; - - long sleep = Math.min(Math.min(backoff, retryBackoffMs), timer.remainingMs()); - timer.sleep(sleep); - } else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) { - processBackgroundEvents(); - result.nextEventType().ifPresent(t -> submitEvent(t, timer)); - } else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) { - offsetCommitCallbackInvoker.executeCallbacks(); - result.nextEventType().ifPresent(t -> submitEvent(t, timer)); - } } private void submitEvent(ApplicationEvent.Type type, Timer timer) { long deadlineMs = calculateDeadlineMs(timer); latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type); - backoff = -1; applicationEventHandler.add(latest); log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 31c402df2a6..5f71cd3fbc7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -51,6 +51,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -70,7 +71,7 @@ public class NetworkClientDelegate implements AutoCloseable { private final int requestTimeoutMs; private final Queue unsentRequests; private final long retryBackoffMs; - private Optional metadataError; + private final AtomicReference metadataError; private final boolean notifyMetadataErrorsViaErrorQueue; private final AsyncConsumerMetrics asyncConsumerMetrics; @@ -91,7 +92,7 @@ public class NetworkClientDelegate implements AutoCloseable { this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadataError = Optional.empty(); + this.metadataError = new AtomicReference<>(); this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue; this.asyncConsumerMetrics = asyncConsumerMetrics; } @@ -163,7 +164,7 @@ public class NetworkClientDelegate implements AutoCloseable { if (notifyMetadataErrorsViaErrorQueue) { backgroundEventHandler.add(new ErrorEvent(e)); } else { - metadataError = Optional.of(e); + metadataError.compareAndSet(null, e); } } } @@ -249,9 +250,8 @@ public class NetworkClientDelegate implements AutoCloseable { } public Optional getAndClearMetadataError() { - Optional metadataError = this.metadataError; - this.metadataError = Optional.empty(); - return metadataError; + Exception exception = metadataError.getAndSet(null); + return Optional.ofNullable(exception); } public Node leastLoadedNode() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 20af1f70542..035d24c41a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -265,14 +265,14 @@ public class ApplicationEventProcessor implements EventProcessor(updatePositionsFuture, event.deadlineMs())); updatePositionsFuture.whenComplete((__, updatePositionsError) -> { - if (maybeFailCompositePoll(event) || maybeFailCompositePoll(event, updatePositionsError)) + if (maybeFailCompositePoll(event, updatePositionsError)) return; log.debug("Processing {} logic for {}", ApplicationEvent.Type.CREATE_FETCH_REQUESTS, event); // If needed, create a fetch request if there's no data in the FetchBuffer. requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> { - if (maybeFailCompositePoll(event) || maybeFailCompositePoll(event, fetchError)) + if (maybeFailCompositePoll(event, fetchError)) return; event.complete(CompositePollEvent.State.COMPLETE, Optional.empty()); @@ -301,6 +301,9 @@ public class ApplicationEventProcessor implements EventProcessor records = (ConsumerRecords) consumer.poll(Duration.ZERO); + final ConsumerRecords records = pollForRecords(); assertFalse(records.isEmpty()); assertFalse(records.nextOffsets().isEmpty()); } @@ -3666,7 +3666,7 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) { service.execute(() -> consumer.poll(Duration.ofSeconds(5))); try { TimeUnit.SECONDS.sleep(1); - assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ZERO)); + assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ofSeconds(5))); client.wakeup(); consumer.wakeup(); } finally { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index f950362354c..4f09abc89dd 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1358,7 +1358,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val consumer = createConsumer() consumer.assign(java.util.List.of(tp)) - assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) + assertThrows(classOf[AuthorizationException], () => consumeRecords(consumer)) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)