Add locking in ServletResponse#flushBuffer

In addition to using the ServletOutputStream, it's also possible to call
ServletResponse#flushBuffer, so the ServletOutputStream wrapper logic needs
to apply there as well.

See gh-32340
This commit is contained in:
rstoyanchev 2024-03-04 13:35:46 +00:00
parent 516a203703
commit 4b96cd28c0
2 changed files with 66 additions and 62 deletions

View File

@ -241,11 +241,11 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
} }
@Override @Override
public ServletOutputStream getOutputStream() { public ServletOutputStream getOutputStream() throws IOException {
if (this.outputStream == null) { if (this.outputStream == null) {
Assert.notNull(this.asyncWebRequest, "Not initialized"); Assert.notNull(this.asyncWebRequest, "Not initialized");
this.outputStream = new LifecycleServletOutputStream( ServletOutputStream delegate = getResponse().getOutputStream();
(HttpServletResponse) getResponse(), this.asyncWebRequest); this.outputStream = new LifecycleServletOutputStream(delegate, this);
} }
return this.outputStream; return this.outputStream;
} }
@ -258,6 +258,46 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
} }
return this.writer; return this.writer;
} }
@Override
public void flushBuffer() throws IOException {
obtainLockAndCheckState();
try {
getResponse().flushBuffer();
}
catch (IOException ex) {
handleIOException(ex, "ServletResponse failed to flushBuffer");
}
finally {
releaseLock();
}
}
private void obtainLockAndCheckState() throws AsyncRequestNotUsableException {
Assert.notNull(this.asyncWebRequest, "Not initialized");
if (this.asyncWebRequest.state != State.NEW) {
this.asyncWebRequest.stateLock.lock();
if (this.asyncWebRequest.state != State.ASYNC) {
this.asyncWebRequest.stateLock.unlock();
throw new AsyncRequestNotUsableException("Response not usable after " +
(this.asyncWebRequest.state == State.COMPLETED ?
"async request completion" : "onError notification") + ".");
}
}
}
void handleIOException(IOException ex, String msg) throws AsyncRequestNotUsableException {
Assert.notNull(this.asyncWebRequest, "Not initialized");
this.asyncWebRequest.transitionToErrorState();
throw new AsyncRequestNotUsableException(msg, ex);
}
void releaseLock() {
Assert.notNull(this.asyncWebRequest, "Not initialized");
if (this.asyncWebRequest.state != State.NEW) {
this.asyncWebRequest.stateLock.unlock();
}
}
} }
@ -267,113 +307,80 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
*/ */
private static final class LifecycleServletOutputStream extends ServletOutputStream { private static final class LifecycleServletOutputStream extends ServletOutputStream {
private final HttpServletResponse delegate; private final ServletOutputStream delegate;
private final StandardServletAsyncWebRequest asyncWebRequest; private final LifecycleHttpServletResponse response;
private LifecycleServletOutputStream(
HttpServletResponse delegate, StandardServletAsyncWebRequest asyncWebRequest) {
private LifecycleServletOutputStream(ServletOutputStream delegate, LifecycleHttpServletResponse response) {
this.delegate = delegate; this.delegate = delegate;
this.asyncWebRequest = asyncWebRequest; this.response = response;
} }
@Override @Override
public boolean isReady() { public boolean isReady() {
return false; return this.delegate.isReady();
} }
@Override @Override
public void setWriteListener(WriteListener writeListener) { public void setWriteListener(WriteListener writeListener) {
throw new UnsupportedOperationException(); this.delegate.setWriteListener(writeListener);
} }
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
obtainLockAndCheckState(); this.response.obtainLockAndCheckState();
try { try {
this.delegate.getOutputStream().write(b); this.delegate.write(b);
} }
catch (IOException ex) { catch (IOException ex) {
handleIOException(ex, "ServletOutputStream failed to write"); this.response.handleIOException(ex, "ServletOutputStream failed to write");
} }
finally { finally {
releaseLock(); this.response.releaseLock();
} }
} }
public void write(byte[] buf, int offset, int len) throws IOException { public void write(byte[] buf, int offset, int len) throws IOException {
obtainLockAndCheckState(); this.response.obtainLockAndCheckState();
try { try {
this.delegate.getOutputStream().write(buf, offset, len); this.delegate.write(buf, offset, len);
} }
catch (IOException ex) { catch (IOException ex) {
handleIOException(ex, "ServletOutputStream failed to write"); this.response.handleIOException(ex, "ServletOutputStream failed to write");
} }
finally { finally {
releaseLock(); this.response.releaseLock();
} }
} }
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
obtainLockAndCheckState(); this.response.obtainLockAndCheckState();
try { try {
this.delegate.getOutputStream().flush(); this.delegate.flush();
} }
catch (IOException ex) { catch (IOException ex) {
handleIOException(ex, "ServletOutputStream failed to flush"); this.response.handleIOException(ex, "ServletOutputStream failed to flush");
} }
finally { finally {
releaseLock(); this.response.releaseLock();
} }
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
obtainLockAndCheckState(); this.response.obtainLockAndCheckState();
try { try {
this.delegate.getOutputStream().close(); this.delegate.close();
} }
catch (IOException ex) { catch (IOException ex) {
handleIOException(ex, "ServletOutputStream failed to close"); this.response.handleIOException(ex, "ServletOutputStream failed to close");
} }
finally { finally {
releaseLock(); this.response.releaseLock();
} }
} }
private void obtainLockAndCheckState() throws AsyncRequestNotUsableException {
if (state() != State.NEW) {
stateLock().lock();
if (state() != State.ASYNC) {
stateLock().unlock();
throw new AsyncRequestNotUsableException("Response not usable after " +
(state() == State.COMPLETED ?
"async request completion" : "onError notification") + ".");
}
}
}
private void handleIOException(IOException ex, String msg) throws AsyncRequestNotUsableException {
this.asyncWebRequest.transitionToErrorState();
throw new AsyncRequestNotUsableException(msg, ex);
}
private void releaseLock() {
if (state() != State.NEW) {
stateLock().unlock();
}
}
private State state() {
return this.asyncWebRequest.state;
}
private Lock stateLock() {
return this.asyncWebRequest.stateLock;
}
} }

View File

@ -488,14 +488,11 @@ public final class WebAsyncManager {
} }
this.asyncWebRequest.startAsync(); this.asyncWebRequest.startAsync();
if (logger.isDebugEnabled()) {
logger.debug("Started async request");
}
} }
private static String formatUri(AsyncWebRequest asyncWebRequest) { private static String formatUri(AsyncWebRequest asyncWebRequest) {
HttpServletRequest request = asyncWebRequest.getNativeRequest(HttpServletRequest.class); HttpServletRequest request = asyncWebRequest.getNativeRequest(HttpServletRequest.class);
return (request != null ? request.getRequestURI() : "servlet container"); return (request != null ? "\"" + request.getRequestURI() + "\"" : "servlet container");
} }