mirror of https://github.com/apache/kafka.git
Refactor poll event handling and metadata error management
Simplifies AsyncKafkaConsumer's CompositePollEventInvoker by removing backoff logic and streamlining state handling. NetworkClientDelegate now uses AtomicReference for metadataError to improve thread safety. ApplicationEventProcessor refines error handling in composite poll events. Updates tests to reflect API changes and exception types.
This commit is contained in:
parent
1e52282c41
commit
2d21fa0fdf
|
@ -202,58 +202,41 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
private class CompositePollEventInvoker {
|
||||
|
||||
private CompositePollEvent latest;
|
||||
private int backoff = -1;
|
||||
|
||||
private void poll(Timer timer) {
|
||||
if (latest == null) {
|
||||
submitEvent(ApplicationEvent.Type.POLL, timer);
|
||||
}
|
||||
|
||||
log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs());
|
||||
|
||||
CompositePollEvent.Result result;
|
||||
|
||||
try {
|
||||
result = latest.resultOrError();
|
||||
log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs());
|
||||
|
||||
CompositePollEvent.Result result = latest.resultOrError();
|
||||
CompositePollEvent.State state = result.state();
|
||||
|
||||
if (state == CompositePollEvent.State.COMPLETE) {
|
||||
// Make sure to clear out the latest request since it's complete.
|
||||
latest = null;
|
||||
} else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) {
|
||||
processBackgroundEvents();
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
} else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) {
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
} else if (state == CompositePollEvent.State.UNKNOWN) {
|
||||
throw new KafkaException("Unexpected poll result received");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// If the background thread hit an exception, bubble it up to the user but make sure to clear
|
||||
// out the latest request to signify this one is complete.
|
||||
// If an exception is hit, bubble it up to the user but make sure to clear out the latest request
|
||||
// to signify this one is complete.
|
||||
latest = null;
|
||||
throw ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
}
|
||||
|
||||
CompositePollEvent.State state = result.state();
|
||||
|
||||
if (state == CompositePollEvent.State.COMPLETE) {
|
||||
// Make sure to clear out the latest request since it's complete.
|
||||
latest = null;
|
||||
|
||||
if (fetchBuffer.isEmpty())
|
||||
submitEvent(ApplicationEvent.Type.POLL, timer);
|
||||
} else if (state == CompositePollEvent.State.UNKNOWN) {
|
||||
latest = null;
|
||||
throw new KafkaException("Unexpected poll result received");
|
||||
} else if (state == CompositePollEvent.State.INCOMPLETE) {
|
||||
if (backoff == -1)
|
||||
backoff = 1;
|
||||
else
|
||||
backoff *= 2;
|
||||
|
||||
long sleep = Math.min(Math.min(backoff, retryBackoffMs), timer.remainingMs());
|
||||
timer.sleep(sleep);
|
||||
} else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) {
|
||||
processBackgroundEvents();
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
} else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) {
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
}
|
||||
}
|
||||
|
||||
private void submitEvent(ApplicationEvent.Type type, Timer timer) {
|
||||
long deadlineMs = calculateDeadlineMs(timer);
|
||||
latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type);
|
||||
backoff = -1;
|
||||
applicationEventHandler.add(latest);
|
||||
log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs());
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -70,7 +71,7 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
private final int requestTimeoutMs;
|
||||
private final Queue<UnsentRequest> unsentRequests;
|
||||
private final long retryBackoffMs;
|
||||
private Optional<Exception> metadataError;
|
||||
private final AtomicReference<Exception> metadataError;
|
||||
private final boolean notifyMetadataErrorsViaErrorQueue;
|
||||
private final AsyncConsumerMetrics asyncConsumerMetrics;
|
||||
|
||||
|
@ -91,7 +92,7 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
this.unsentRequests = new ArrayDeque<>();
|
||||
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
|
||||
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||
this.metadataError = Optional.empty();
|
||||
this.metadataError = new AtomicReference<>();
|
||||
this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue;
|
||||
this.asyncConsumerMetrics = asyncConsumerMetrics;
|
||||
}
|
||||
|
@ -163,7 +164,7 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
if (notifyMetadataErrorsViaErrorQueue) {
|
||||
backgroundEventHandler.add(new ErrorEvent(e));
|
||||
} else {
|
||||
metadataError = Optional.of(e);
|
||||
metadataError.compareAndSet(null, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -249,9 +250,8 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
}
|
||||
|
||||
public Optional<Exception> getAndClearMetadataError() {
|
||||
Optional<Exception> metadataError = this.metadataError;
|
||||
this.metadataError = Optional.empty();
|
||||
return metadataError;
|
||||
Exception exception = metadataError.getAndSet(null);
|
||||
return Optional.ofNullable(exception);
|
||||
}
|
||||
|
||||
public Node leastLoadedNode() {
|
||||
|
|
|
@ -265,14 +265,14 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
applicationEventReaper.add(new CompositePollPsuedoEvent<>(updatePositionsFuture, event.deadlineMs()));
|
||||
|
||||
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
|
||||
if (maybeFailCompositePoll(event) || maybeFailCompositePoll(event, updatePositionsError))
|
||||
if (maybeFailCompositePoll(event, updatePositionsError))
|
||||
return;
|
||||
|
||||
log.debug("Processing {} logic for {}", ApplicationEvent.Type.CREATE_FETCH_REQUESTS, event);
|
||||
|
||||
// If needed, create a fetch request if there's no data in the FetchBuffer.
|
||||
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
|
||||
if (maybeFailCompositePoll(event) || maybeFailCompositePoll(event, fetchError))
|
||||
if (maybeFailCompositePoll(event, fetchError))
|
||||
return;
|
||||
|
||||
event.complete(CompositePollEvent.State.COMPLETE, Optional.empty());
|
||||
|
@ -301,6 +301,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
}
|
||||
|
||||
private boolean maybeFailCompositePoll(CompositePollEvent event, Throwable t) {
|
||||
if (maybeFailCompositePoll(event))
|
||||
return true;
|
||||
|
||||
if (t == null)
|
||||
return false;
|
||||
|
||||
|
|
|
@ -2118,7 +2118,7 @@ public class KafkaConsumerTest {
|
|||
time.sleep(heartbeatIntervalMs);
|
||||
Thread.sleep(heartbeatIntervalMs);
|
||||
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
|
||||
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO);
|
||||
final ConsumerRecords<String, String> records = pollForRecords();
|
||||
assertFalse(records.isEmpty());
|
||||
assertFalse(records.nextOffsets().isEmpty());
|
||||
}
|
||||
|
@ -3666,7 +3666,7 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
|
|||
service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ZERO));
|
||||
assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ofSeconds(5)));
|
||||
client.wakeup();
|
||||
consumer.wakeup();
|
||||
} finally {
|
||||
|
|
|
@ -1358,7 +1358,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
|
||||
val consumer = createConsumer()
|
||||
consumer.assign(java.util.List.of(tp))
|
||||
assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
|
||||
assertThrows(classOf[AuthorizationException], () => consumeRecords(consumer))
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||
|
|
Loading…
Reference in New Issue