Merge branch '6.2.x'

This commit is contained in:
Brian Clozel 2025-09-09 16:56:29 +02:00
commit 39db0e0af2
2 changed files with 108 additions and 55 deletions

View File

@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.jspecify.annotations.Nullable;
@ -63,6 +65,7 @@ import org.springframework.util.ObjectUtils;
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @author Brian Clozel
* @author Taeik Lim
* @since 4.2
*/
public class ResponseBodyEmitter {
@ -86,6 +89,8 @@ public class ResponseBodyEmitter {
private final DefaultCallback completionCallback = new DefaultCallback();
/** Guards access to write operations on the response. */
protected final Lock writeLock = new ReentrantLock();
/**
* Create a new ResponseBodyEmitter instance.
@ -114,36 +119,48 @@ public class ResponseBodyEmitter {
}
synchronized void initialize(Handler handler) throws IOException {
this.handler = handler;
void initialize(Handler handler) throws IOException {
this.writeLock.lock();
try {
sendInternal(this.earlySendAttempts);
}
finally {
this.earlySendAttempts.clear();
}
this.handler = handler;
if (this.complete) {
if (this.failure != null) {
this.handler.completeWithError(this.failure);
try {
sendInternal(this.earlySendAttempts);
}
finally {
this.earlySendAttempts.clear();
}
if (this.complete) {
if (this.failure != null) {
this.handler.completeWithError(this.failure);
}
else {
this.handler.complete();
}
}
else {
this.handler.complete();
this.handler.onTimeout(this.timeoutCallback);
this.handler.onError(this.errorCallback);
this.handler.onCompletion(this.completionCallback);
}
}
else {
this.handler.onTimeout(this.timeoutCallback);
this.handler.onError(this.errorCallback);
this.handler.onCompletion(this.completionCallback);
finally {
this.writeLock.unlock();
}
}
synchronized void initializeWithError(Throwable ex) {
this.complete = true;
this.failure = ex;
this.earlySendAttempts.clear();
this.errorCallback.accept(ex);
void initializeWithError(Throwable ex) {
this.writeLock.lock();
try {
this.complete = true;
this.failure = ex;
this.earlySendAttempts.clear();
this.errorCallback.accept(ex);
}
finally {
this.writeLock.unlock();
}
}
/**
@ -180,22 +197,28 @@ public class ResponseBodyEmitter {
* @throws IOException raised when an I/O error occurs
* @throws java.lang.IllegalStateException wraps any other errors
*/
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
public void send(Object object, @Nullable MediaType mediaType) throws IOException {
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
if (this.handler != null) {
try {
this.handler.send(object, mediaType);
this.writeLock.lock();
try {
if (this.handler != null) {
try {
this.handler.send(object, mediaType);
}
catch (IOException ex) {
throw ex;
}
catch (Throwable ex) {
throw new IllegalStateException("Failed to send " + object, ex);
}
}
catch (IOException ex) {
throw ex;
}
catch (Throwable ex) {
throw new IllegalStateException("Failed to send " + object, ex);
else {
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
}
}
else {
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
finally {
this.writeLock.unlock();
}
}
@ -208,10 +231,16 @@ public class ResponseBodyEmitter {
* @throws java.lang.IllegalStateException wraps any other errors
* @since 6.0.12
*/
public synchronized void send(Set<DataWithMediaType> items) throws IOException {
public void send(Set<DataWithMediaType> items) throws IOException {
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
sendInternal(items);
this.writeLock.lock();
try {
sendInternal(items);
}
finally {
this.writeLock.unlock();
}
}
private void sendInternal(Set<DataWithMediaType> items) throws IOException {
@ -242,10 +271,16 @@ public class ResponseBodyEmitter {
* to complete request processing. It should not be used after container
* related events such as an error while {@link #send(Object) sending}.
*/
public synchronized void complete() {
this.complete = true;
if (this.handler != null) {
this.handler.complete();
public void complete() {
this.writeLock.lock();
try {
this.complete = true;
if (this.handler != null) {
this.handler.complete();
}
}
finally {
this.writeLock.unlock();
}
}
@ -260,11 +295,17 @@ public class ResponseBodyEmitter {
* container related events such as an error while
* {@link #send(Object) sending}.
*/
public synchronized void completeWithError(Throwable ex) {
this.complete = true;
this.failure = ex;
if (this.handler != null) {
this.handler.completeWithError(ex);
public void completeWithError(Throwable ex) {
this.writeLock.lock();
try {
this.complete = true;
this.failure = ex;
if (this.handler != null) {
this.handler.completeWithError(ex);
}
}
finally {
this.writeLock.unlock();
}
}
@ -273,8 +314,14 @@ public class ResponseBodyEmitter {
* called from a container thread when an async request times out.
* <p>As of 6.2, one can register multiple callbacks for this event.
*/
public synchronized void onTimeout(Runnable callback) {
this.timeoutCallback.addDelegate(callback);
public void onTimeout(Runnable callback) {
this.writeLock.lock();
try {
this.timeoutCallback.addDelegate(callback);
}
finally {
this.writeLock.unlock();
}
}
/**
@ -284,8 +331,14 @@ public class ResponseBodyEmitter {
* <p>As of 6.2, one can register multiple callbacks for this event.
* @since 5.0
*/
public synchronized void onError(Consumer<Throwable> callback) {
this.errorCallback.addDelegate(callback);
public void onError(Consumer<Throwable> callback) {
this.writeLock.lock();
try {
this.errorCallback.addDelegate(callback);
}
finally {
this.writeLock.unlock();
}
}
/**
@ -295,8 +348,14 @@ public class ResponseBodyEmitter {
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
* <p>As of 6.2, one can register multiple callbacks for this event.
*/
public synchronized void onCompletion(Runnable callback) {
this.completionCallback.addDelegate(callback);
public void onCompletion(Runnable callback) {
this.writeLock.lock();
try {
this.completionCallback.addDelegate(callback);
}
finally {
this.writeLock.unlock();
}
}

View File

@ -21,8 +21,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jspecify.annotations.Nullable;
@ -47,10 +45,6 @@ public class SseEmitter extends ResponseBodyEmitter {
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
/** Guards access to write operations on the response. */
private final Lock writeLock = new ReentrantLock();
/**
* Create a new SseEmitter instance.
*/