Updates for MetadataErrorNotifiableEvent

This commit is contained in:
Kirk True 2025-10-07 08:52:21 -07:00
parent e7b53865fd
commit 4985c7de17
6 changed files with 27 additions and 38 deletions

View File

@ -199,7 +199,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
if (event instanceof MetadataErrorNotifiableEvent) {
if (maybeFailOnMetadataError((MetadataErrorNotifiableEvent) event))
if (maybeFailOnMetadataError(List.of(event)))
continue;
}
applicationEventProcessor.process(event);
@ -372,38 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
/**
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
List<MetadataErrorNotifiableEvent> notifiables = new ArrayList<>();
private boolean maybeFailOnMetadataError(List<?> events) {
List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
for (CompletableEvent<?> ce : events) {
if (ce instanceof MetadataErrorNotifiableEvent) {
notifiables.add((MetadataErrorNotifiableEvent) ce);
for (Object obj : events) {
if (obj instanceof MetadataErrorNotifiableEvent) {
filteredEvents.add((MetadataErrorNotifiableEvent) obj);
}
}
if (notifiables.isEmpty())
return;
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
return;
Exception metadataError = metadataErrorOpt.get();
notifiables.forEach(n -> n.completeExceptionallyWithMetadataError(metadataError));
}
/**
* If there is a metadata error, complete this event exceptionally.
*/
private boolean maybeFailOnMetadataError(MetadataErrorNotifiableEvent notifiable) {
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
// Don't get-and-clear the metadata error if there are no events that will be notified.
if (filteredEvents.isEmpty())
return false;
Exception metadataError = metadataErrorOpt.get();
notifiable.completeExceptionallyWithMetadataError(metadataError);
Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
if (andClearMetadataError.isPresent()) {
Exception metadataError = andClearMetadataError.get();
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
return true;
} else {
return false;
}
}
}

View File

@ -28,7 +28,7 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE
}
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
}

View File

@ -85,8 +85,8 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
}
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException));
public void onMetadataError(Exception metadataError) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
}
@Override

View File

@ -44,7 +44,7 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
* or {@link Consumer#position(TopicPartition) position} process.
*/
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
}

View File

@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
}
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
@Override

View File

@ -50,7 +50,7 @@ public interface MetadataErrorNotifiableEvent {
* </li>
* </ul>
*
* @param metadataException Error that originally came from {@link Metadata#maybeThrowAnyException()}
* @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
*/
void completeExceptionallyWithMetadataError(Exception metadataException);
void onMetadataError(Exception metadataError);
}