Polishing

See gh-30393
This commit is contained in:
rstoyanchev 2024-01-16 18:53:01 +00:00
parent c4a34fa26c
commit 682f4715cf
2 changed files with 20 additions and 13 deletions

View File

@ -72,8 +72,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Nullable @Nullable
private volatile Subscriber<? super T> subscriber; private volatile Subscriber<? super T> subscriber;
/** Flag to defer transition to COMPLETED briefly while SUBSCRIBING or READING. */
private volatile boolean completionPending; private volatile boolean completionPending;
/** Flag to defer transition to COMPLETED briefly while SUBSCRIBING or READING. */
@Nullable @Nullable
private volatile Throwable errorPending; private volatile Throwable errorPending;
@ -123,8 +125,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
} }
/** /**
* Subclasses can call this method to delegate a container notification when * Subclasses can call this method to signal onComplete, delegating a
* all data has been read. * notification from the container when all data has been read.
*/ */
public void onAllDataRead() { public void onAllDataRead() {
State state = this.state.get(); State state = this.state.get();
@ -135,7 +137,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
} }
/** /**
* Subclasses can call this to delegate container error notifications. * Subclasses can call this to signal onError, delegating a
* notification from the container for an error.
*/ */
public final void onError(Throwable ex) { public final void onError(Throwable ex) {
State state = this.state.get(); State state = this.state.get();
@ -183,10 +186,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
// Private methods for use in State... // Private methods for use in State...
/** /**
* Read and publish data one at a time until there is no more data, no more * Read and publish data one by one until there are no more items
* demand, or perhaps we completed meanwhile. * to read (i.e. input queue drained), or there is no more demand.
* @return {@code true} if there is more demand; {@code false} if there is * @return {@code true} if there is demand but no more to read, or
* no more demand or we have completed. * {@code false} if there is more to read but lack of demand.
*/ */
private boolean readAndPublish() throws IOException { private boolean readAndPublish() throws IOException {
long r; long r;
@ -269,7 +272,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override @Override
public final void request(long n) { public void request(long n) {
if (rsReadLogger.isTraceEnabled()) { if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "request " + (n != Long.MAX_VALUE ? n : "Long.MAX_VALUE")); rsReadLogger.trace(getLogPrefix() + "request " + (n != Long.MAX_VALUE ? n : "Long.MAX_VALUE"));
} }
@ -277,7 +280,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
} }
@Override @Override
public final void cancel() { public void cancel() {
State state = AbstractListenerReadPublisher.this.state.get(); State state = AbstractListenerReadPublisher.this.state.get();
if (rsReadLogger.isTraceEnabled()) { if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "cancel [" + state + "]"); rsReadLogger.trace(getLogPrefix() + "cancel [" + state + "]");
@ -288,7 +291,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
/** /**
* Represents a state for the {@link Publisher} to be in. * The states that a read {@link Publisher} transitions through.
* <p><pre> * <p><pre>
* UNSUBSCRIBED * UNSUBSCRIBED
* | * |

View File

@ -112,7 +112,8 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override @Override
public Flux<WebSocketMessage> receive() { public Flux<WebSocketMessage> receive() {
return (canSuspendReceiving() ? Flux.from(this.receivePublisher) : return (canSuspendReceiving() ?
Flux.from(this.receivePublisher) :
Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE)); Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE));
} }
@ -240,6 +241,9 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
} }
/**
* Read publisher for inbound WebSocket messages.
*/
private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> { private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
private volatile Queue<Object> pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get(); private volatile Queue<Object> pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get();
@ -269,7 +273,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override @Override
@Nullable @Nullable
protected WebSocketMessage read() throws IOException { protected WebSocketMessage read() {
return (WebSocketMessage) this.pendingMessages.poll(); return (WebSocketMessage) this.pendingMessages.poll();
} }
@ -304,7 +308,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
/** /**
* Processor to send web socket messages. * Write processor for outbound WebSocket messages.
*/ */
protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> { protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {