diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index f3821c133ce..c3599e3a820 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2025 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.jspecify.annotations.Nullable; @@ -72,19 +71,20 @@ public class ResponseBodyEmitter { private @Nullable Handler handler; - private final AtomicReference state = new AtomicReference<>(State.START); - /** Store send data before handler is initialized. */ private final Set earlySendAttempts = new LinkedHashSet<>(8); + /** Store successful completion before the handler is initialized. */ + private boolean complete; + /** Store an error before the handler is initialized. */ private @Nullable Throwable failure; - private final TimeoutCallback timeoutCallback = new TimeoutCallback(); + private final DefaultCallback timeoutCallback = new DefaultCallback(); private final ErrorCallback errorCallback = new ErrorCallback(); - private final CompletionCallback completionCallback = new CompletionCallback(); + private final DefaultCallback completionCallback = new DefaultCallback(); /** @@ -124,7 +124,7 @@ public class ResponseBodyEmitter { this.earlySendAttempts.clear(); } - if (this.state.get() == State.COMPLETE) { + if (this.complete) { if (this.failure != null) { this.handler.completeWithError(this.failure); } @@ -139,12 +139,11 @@ public class ResponseBodyEmitter { } } - void initializeWithError(Throwable ex) { - if (this.state.compareAndSet(State.START, State.COMPLETE)) { - this.failure = ex; - this.earlySendAttempts.clear(); - this.errorCallback.accept(ex); - } + synchronized void initializeWithError(Throwable ex) { + this.complete = true; + this.failure = ex; + this.earlySendAttempts.clear(); + this.errorCallback.accept(ex); } /** @@ -182,7 +181,8 @@ public class ResponseBodyEmitter { * @throws java.lang.IllegalStateException wraps any other errors */ public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { - assertNotComplete(); + Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + + (this.failure != null ? " with error: " + this.failure : "")); if (this.handler != null) { try { this.handler.send(object, mediaType); @@ -209,13 +209,9 @@ public class ResponseBodyEmitter { * @since 6.0.12 */ public synchronized void send(Set items) throws IOException { - assertNotComplete(); - sendInternal(items); - } - - private void assertNotComplete() { - Assert.state(this.state.get() == State.START, () -> "ResponseBodyEmitter has already completed" + + Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + (this.failure != null ? " with error: " + this.failure : "")); + sendInternal(items); } private void sendInternal(Set items) throws IOException { @@ -246,8 +242,9 @@ public class ResponseBodyEmitter { * to complete request processing. It should not be used after container * related events such as an error while {@link #send(Object) sending}. */ - public void complete() { - if (trySetComplete() && this.handler != null) { + public synchronized void complete() { + this.complete = true; + if (this.handler != null) { this.handler.complete(); } } @@ -263,26 +260,20 @@ public class ResponseBodyEmitter { * container related events such as an error while * {@link #send(Object) sending}. */ - public void completeWithError(Throwable ex) { - if (trySetComplete()) { - this.failure = ex; - if (this.handler != null) { - this.handler.completeWithError(ex); - } + public synchronized void completeWithError(Throwable ex) { + this.complete = true; + this.failure = ex; + if (this.handler != null) { + this.handler.completeWithError(ex); } } - private boolean trySetComplete() { - return (this.state.compareAndSet(State.START, State.COMPLETE) || - (this.state.compareAndSet(State.TIMEOUT, State.COMPLETE))); - } - /** * Register code to invoke when the async request times out. This method is * called from a container thread when an async request times out. *

As of 6.2, one can register multiple callbacks for this event. */ - public void onTimeout(Runnable callback) { + public synchronized void onTimeout(Runnable callback) { this.timeoutCallback.addDelegate(callback); } @@ -293,7 +284,7 @@ public class ResponseBodyEmitter { *

As of 6.2, one can register multiple callbacks for this event. * @since 5.0 */ - public void onError(Consumer callback) { + public synchronized void onError(Consumer callback) { this.errorCallback.addDelegate(callback); } @@ -304,7 +295,7 @@ public class ResponseBodyEmitter { * detecting that a {@code ResponseBodyEmitter} instance is no longer usable. *

As of 6.2, one can register multiple callbacks for this event. */ - public void onCompletion(Runnable callback) { + public synchronized void onCompletion(Runnable callback) { this.completionCallback.addDelegate(callback); } @@ -371,20 +362,19 @@ public class ResponseBodyEmitter { } - private class TimeoutCallback implements Runnable { + private class DefaultCallback implements Runnable { - private final List delegates = new ArrayList<>(1); + private List delegates = new ArrayList<>(1); - public synchronized void addDelegate(Runnable delegate) { + public void addDelegate(Runnable delegate) { this.delegates.add(delegate); } @Override public void run() { - if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.TIMEOUT)) { - for (Runnable delegate : this.delegates) { - delegate.run(); - } + ResponseBodyEmitter.this.complete = true; + for (Runnable delegate : this.delegates) { + delegate.run(); } } } @@ -392,59 +382,19 @@ public class ResponseBodyEmitter { private class ErrorCallback implements Consumer { - private final List> delegates = new ArrayList<>(1); + private List> delegates = new ArrayList<>(1); - public synchronized void addDelegate(Consumer callback) { + public void addDelegate(Consumer callback) { this.delegates.add(callback); } @Override public void accept(Throwable t) { - if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) { - for (Consumer delegate : this.delegates) { - delegate.accept(t); - } + ResponseBodyEmitter.this.complete = true; + for(Consumer delegate : this.delegates) { + delegate.accept(t); } } } - - private class CompletionCallback implements Runnable { - - private final List delegates = new ArrayList<>(1); - - public synchronized void addDelegate(Runnable delegate) { - this.delegates.add(delegate); - } - - @Override - public void run() { - if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) { - for (Runnable delegate : this.delegates) { - delegate.run(); - } - } - } - } - - - /** - * Represents a state for {@link ResponseBodyEmitter}. - *

-	 *     START ----+
-	 *       |       |
-	 *       v       |
-	 *    TIMEOUT    |
-	 *       |       |
-	 *       v       |
-	 *   COMPLETE <--+
-	 * 
- * @since 6.2.4 - */ - private enum State { - START, - TIMEOUT, // handling a timeout - COMPLETE - } - }