Merge branch '6.2.x'
This commit is contained in:
commit
d8b05c7eba
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2025 the original author or authors.
|
||||
* Copyright 2002-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -21,7 +21,6 @@ import java.util.ArrayList;
|
|||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.jspecify.annotations.Nullable;
|
||||
|
|
@ -72,19 +71,20 @@ public class ResponseBodyEmitter {
|
|||
|
||||
private @Nullable Handler handler;
|
||||
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.START);
|
||||
|
||||
/** Store send data before handler is initialized. */
|
||||
private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(8);
|
||||
|
||||
/** Store successful completion before the handler is initialized. */
|
||||
private boolean complete;
|
||||
|
||||
/** Store an error before the handler is initialized. */
|
||||
private @Nullable Throwable failure;
|
||||
|
||||
private final TimeoutCallback timeoutCallback = new TimeoutCallback();
|
||||
private final DefaultCallback timeoutCallback = new DefaultCallback();
|
||||
|
||||
private final ErrorCallback errorCallback = new ErrorCallback();
|
||||
|
||||
private final CompletionCallback completionCallback = new CompletionCallback();
|
||||
private final DefaultCallback completionCallback = new DefaultCallback();
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -124,7 +124,7 @@ public class ResponseBodyEmitter {
|
|||
this.earlySendAttempts.clear();
|
||||
}
|
||||
|
||||
if (this.state.get() == State.COMPLETE) {
|
||||
if (this.complete) {
|
||||
if (this.failure != null) {
|
||||
this.handler.completeWithError(this.failure);
|
||||
}
|
||||
|
|
@ -139,12 +139,11 @@ public class ResponseBodyEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
void initializeWithError(Throwable ex) {
|
||||
if (this.state.compareAndSet(State.START, State.COMPLETE)) {
|
||||
this.failure = ex;
|
||||
this.earlySendAttempts.clear();
|
||||
this.errorCallback.accept(ex);
|
||||
}
|
||||
synchronized void initializeWithError(Throwable ex) {
|
||||
this.complete = true;
|
||||
this.failure = ex;
|
||||
this.earlySendAttempts.clear();
|
||||
this.errorCallback.accept(ex);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -182,7 +181,8 @@ public class ResponseBodyEmitter {
|
|||
* @throws java.lang.IllegalStateException wraps any other errors
|
||||
*/
|
||||
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
|
||||
assertNotComplete();
|
||||
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
|
||||
(this.failure != null ? " with error: " + this.failure : ""));
|
||||
if (this.handler != null) {
|
||||
try {
|
||||
this.handler.send(object, mediaType);
|
||||
|
|
@ -209,13 +209,9 @@ public class ResponseBodyEmitter {
|
|||
* @since 6.0.12
|
||||
*/
|
||||
public synchronized void send(Set<DataWithMediaType> items) throws IOException {
|
||||
assertNotComplete();
|
||||
sendInternal(items);
|
||||
}
|
||||
|
||||
private void assertNotComplete() {
|
||||
Assert.state(this.state.get() == State.START, () -> "ResponseBodyEmitter has already completed" +
|
||||
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
|
||||
(this.failure != null ? " with error: " + this.failure : ""));
|
||||
sendInternal(items);
|
||||
}
|
||||
|
||||
private void sendInternal(Set<DataWithMediaType> items) throws IOException {
|
||||
|
|
@ -246,8 +242,9 @@ public class ResponseBodyEmitter {
|
|||
* to complete request processing. It should not be used after container
|
||||
* related events such as an error while {@link #send(Object) sending}.
|
||||
*/
|
||||
public void complete() {
|
||||
if (trySetComplete() && this.handler != null) {
|
||||
public synchronized void complete() {
|
||||
this.complete = true;
|
||||
if (this.handler != null) {
|
||||
this.handler.complete();
|
||||
}
|
||||
}
|
||||
|
|
@ -263,26 +260,20 @@ public class ResponseBodyEmitter {
|
|||
* container related events such as an error while
|
||||
* {@link #send(Object) sending}.
|
||||
*/
|
||||
public void completeWithError(Throwable ex) {
|
||||
if (trySetComplete()) {
|
||||
this.failure = ex;
|
||||
if (this.handler != null) {
|
||||
this.handler.completeWithError(ex);
|
||||
}
|
||||
public synchronized void completeWithError(Throwable ex) {
|
||||
this.complete = true;
|
||||
this.failure = ex;
|
||||
if (this.handler != null) {
|
||||
this.handler.completeWithError(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean trySetComplete() {
|
||||
return (this.state.compareAndSet(State.START, State.COMPLETE) ||
|
||||
(this.state.compareAndSet(State.TIMEOUT, State.COMPLETE)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Register code to invoke when the async request times out. This method is
|
||||
* called from a container thread when an async request times out.
|
||||
* <p>As of 6.2, one can register multiple callbacks for this event.
|
||||
*/
|
||||
public void onTimeout(Runnable callback) {
|
||||
public synchronized void onTimeout(Runnable callback) {
|
||||
this.timeoutCallback.addDelegate(callback);
|
||||
}
|
||||
|
||||
|
|
@ -293,7 +284,7 @@ public class ResponseBodyEmitter {
|
|||
* <p>As of 6.2, one can register multiple callbacks for this event.
|
||||
* @since 5.0
|
||||
*/
|
||||
public void onError(Consumer<Throwable> callback) {
|
||||
public synchronized void onError(Consumer<Throwable> callback) {
|
||||
this.errorCallback.addDelegate(callback);
|
||||
}
|
||||
|
||||
|
|
@ -304,7 +295,7 @@ public class ResponseBodyEmitter {
|
|||
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
|
||||
* <p>As of 6.2, one can register multiple callbacks for this event.
|
||||
*/
|
||||
public void onCompletion(Runnable callback) {
|
||||
public synchronized void onCompletion(Runnable callback) {
|
||||
this.completionCallback.addDelegate(callback);
|
||||
}
|
||||
|
||||
|
|
@ -371,20 +362,19 @@ public class ResponseBodyEmitter {
|
|||
}
|
||||
|
||||
|
||||
private class TimeoutCallback implements Runnable {
|
||||
private class DefaultCallback implements Runnable {
|
||||
|
||||
private final List<Runnable> delegates = new ArrayList<>(1);
|
||||
private List<Runnable> delegates = new ArrayList<>(1);
|
||||
|
||||
public synchronized void addDelegate(Runnable delegate) {
|
||||
public void addDelegate(Runnable delegate) {
|
||||
this.delegates.add(delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.TIMEOUT)) {
|
||||
for (Runnable delegate : this.delegates) {
|
||||
delegate.run();
|
||||
}
|
||||
ResponseBodyEmitter.this.complete = true;
|
||||
for (Runnable delegate : this.delegates) {
|
||||
delegate.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -392,59 +382,19 @@ public class ResponseBodyEmitter {
|
|||
|
||||
private class ErrorCallback implements Consumer<Throwable> {
|
||||
|
||||
private final List<Consumer<Throwable>> delegates = new ArrayList<>(1);
|
||||
private List<Consumer<Throwable>> delegates = new ArrayList<>(1);
|
||||
|
||||
public synchronized void addDelegate(Consumer<Throwable> callback) {
|
||||
public void addDelegate(Consumer<Throwable> callback) {
|
||||
this.delegates.add(callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(Throwable t) {
|
||||
if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) {
|
||||
for (Consumer<Throwable> delegate : this.delegates) {
|
||||
delegate.accept(t);
|
||||
}
|
||||
ResponseBodyEmitter.this.complete = true;
|
||||
for(Consumer<Throwable> delegate : this.delegates) {
|
||||
delegate.accept(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class CompletionCallback implements Runnable {
|
||||
|
||||
private final List<Runnable> delegates = new ArrayList<>(1);
|
||||
|
||||
public synchronized void addDelegate(Runnable delegate) {
|
||||
this.delegates.add(delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) {
|
||||
for (Runnable delegate : this.delegates) {
|
||||
delegate.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Represents a state for {@link ResponseBodyEmitter}.
|
||||
* <p><pre>
|
||||
* START ----+
|
||||
* | |
|
||||
* v |
|
||||
* TIMEOUT |
|
||||
* | |
|
||||
* v |
|
||||
* COMPLETE <--+
|
||||
* </pre>
|
||||
* @since 6.2.4
|
||||
*/
|
||||
private enum State {
|
||||
START,
|
||||
TIMEOUT, // handling a timeout
|
||||
COMPLETE
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue