Avoid thread pinning in SseEmitter, ResponseBodyEmitter

Closes gh-35423

Signed-off-by: Taeik Lim <sibera21@gmail.com>
This commit is contained in:
Taeik Lim 2025-09-05 00:27:00 +09:00 committed by Brian Clozel
parent 9e8c64011d
commit c788554b1d
2 changed files with 108 additions and 55 deletions

View File

@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -62,6 +64,7 @@ import org.springframework.util.ObjectUtils;
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Juergen Hoeller * @author Juergen Hoeller
* @author Brian Clozel * @author Brian Clozel
* @author Taeik Lim
* @since 4.2 * @since 4.2
*/ */
public class ResponseBodyEmitter { public class ResponseBodyEmitter {
@ -88,6 +91,8 @@ public class ResponseBodyEmitter {
private final DefaultCallback completionCallback = new DefaultCallback(); private final DefaultCallback completionCallback = new DefaultCallback();
/** Guards access to write operations on the response. */
protected final Lock writeLock = new ReentrantLock();
/** /**
* Create a new ResponseBodyEmitter instance. * Create a new ResponseBodyEmitter instance.
@ -117,7 +122,9 @@ public class ResponseBodyEmitter {
} }
synchronized void initialize(Handler handler) throws IOException { void initialize(Handler handler) throws IOException {
this.writeLock.lock();
try {
this.handler = handler; this.handler = handler;
try { try {
@ -141,13 +148,23 @@ public class ResponseBodyEmitter {
this.handler.onCompletion(this.completionCallback); this.handler.onCompletion(this.completionCallback);
} }
} }
finally {
this.writeLock.unlock();
}
}
synchronized void initializeWithError(Throwable ex) { void initializeWithError(Throwable ex) {
this.writeLock.lock();
try {
this.complete = true; this.complete = true;
this.failure = ex; this.failure = ex;
this.earlySendAttempts.clear(); this.earlySendAttempts.clear();
this.errorCallback.accept(ex); this.errorCallback.accept(ex);
} }
finally {
this.writeLock.unlock();
}
}
/** /**
* Invoked after the response is updated with the status code and headers, * Invoked after the response is updated with the status code and headers,
@ -183,9 +200,11 @@ public class ResponseBodyEmitter {
* @throws IOException raised when an I/O error occurs * @throws IOException raised when an I/O error occurs
* @throws java.lang.IllegalStateException wraps any other errors * @throws java.lang.IllegalStateException wraps any other errors
*/ */
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { public void send(Object object, @Nullable MediaType mediaType) throws IOException {
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : "")); (this.failure != null ? " with error: " + this.failure : ""));
this.writeLock.lock();
try {
if (this.handler != null) { if (this.handler != null) {
try { try {
this.handler.send(object, mediaType); this.handler.send(object, mediaType);
@ -201,6 +220,10 @@ public class ResponseBodyEmitter {
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType)); this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
} }
} }
finally {
this.writeLock.unlock();
}
}
/** /**
* Write a set of data and MediaType pairs in a batch. * Write a set of data and MediaType pairs in a batch.
@ -211,11 +234,17 @@ public class ResponseBodyEmitter {
* @throws java.lang.IllegalStateException wraps any other errors * @throws java.lang.IllegalStateException wraps any other errors
* @since 6.0.12 * @since 6.0.12
*/ */
public synchronized void send(Set<DataWithMediaType> items) throws IOException { public void send(Set<DataWithMediaType> items) throws IOException {
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : "")); (this.failure != null ? " with error: " + this.failure : ""));
this.writeLock.lock();
try {
sendInternal(items); sendInternal(items);
} }
finally {
this.writeLock.unlock();
}
}
private void sendInternal(Set<DataWithMediaType> items) throws IOException { private void sendInternal(Set<DataWithMediaType> items) throws IOException {
if (items.isEmpty()) { if (items.isEmpty()) {
@ -245,12 +274,18 @@ public class ResponseBodyEmitter {
* to complete request processing. It should not be used after container * to complete request processing. It should not be used after container
* related events such as an error while {@link #send(Object) sending}. * related events such as an error while {@link #send(Object) sending}.
*/ */
public synchronized void complete() { public void complete() {
this.writeLock.lock();
try {
this.complete = true; this.complete = true;
if (this.handler != null) { if (this.handler != null) {
this.handler.complete(); this.handler.complete();
} }
} }
finally {
this.writeLock.unlock();
}
}
/** /**
* Complete request processing with an error. * Complete request processing with an error.
@ -263,22 +298,34 @@ public class ResponseBodyEmitter {
* container related events such as an error while * container related events such as an error while
* {@link #send(Object) sending}. * {@link #send(Object) sending}.
*/ */
public synchronized void completeWithError(Throwable ex) { public void completeWithError(Throwable ex) {
this.writeLock.lock();
try {
this.complete = true; this.complete = true;
this.failure = ex; this.failure = ex;
if (this.handler != null) { if (this.handler != null) {
this.handler.completeWithError(ex); this.handler.completeWithError(ex);
} }
} }
finally {
this.writeLock.unlock();
}
}
/** /**
* Register code to invoke when the async request times out. This method is * Register code to invoke when the async request times out. This method is
* called from a container thread when an async request times out. * called from a container thread when an async request times out.
* <p>As of 6.2, one can register multiple callbacks for this event. * <p>As of 6.2, one can register multiple callbacks for this event.
*/ */
public synchronized void onTimeout(Runnable callback) { public void onTimeout(Runnable callback) {
this.writeLock.lock();
try {
this.timeoutCallback.addDelegate(callback); this.timeoutCallback.addDelegate(callback);
} }
finally {
this.writeLock.unlock();
}
}
/** /**
* Register code to invoke for an error during async request processing. * Register code to invoke for an error during async request processing.
@ -287,9 +334,15 @@ public class ResponseBodyEmitter {
* <p>As of 6.2, one can register multiple callbacks for this event. * <p>As of 6.2, one can register multiple callbacks for this event.
* @since 5.0 * @since 5.0
*/ */
public synchronized void onError(Consumer<Throwable> callback) { public void onError(Consumer<Throwable> callback) {
this.writeLock.lock();
try {
this.errorCallback.addDelegate(callback); this.errorCallback.addDelegate(callback);
} }
finally {
this.writeLock.unlock();
}
}
/** /**
* Register code to invoke when the async request completes. This method is * Register code to invoke when the async request completes. This method is
@ -298,9 +351,15 @@ public class ResponseBodyEmitter {
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable. * detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
* <p>As of 6.2, one can register multiple callbacks for this event. * <p>As of 6.2, one can register multiple callbacks for this event.
*/ */
public synchronized void onCompletion(Runnable callback) { public void onCompletion(Runnable callback) {
this.writeLock.lock();
try {
this.completionCallback.addDelegate(callback); this.completionCallback.addDelegate(callback);
} }
finally {
this.writeLock.unlock();
}
}
@Override @Override

View File

@ -21,8 +21,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -46,10 +44,6 @@ public class SseEmitter extends ResponseBodyEmitter {
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8); private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
/** Guards access to write operations on the response. */
private final Lock writeLock = new ReentrantLock();
/** /**
* Create a new SseEmitter instance. * Create a new SseEmitter instance.
*/ */