From c1b191edb0fa579e92c5a76f7c8245b45e4293eb Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 22 Nov 2017 23:25:36 -0500 Subject: [PATCH] Polish and improve logging --- .../AbstractListenerReadPublisher.java | 83 ++++++++++--------- .../AbstractListenerWriteFlushProcessor.java | 54 ++++++------ .../AbstractListenerWriteProcessor.java | 63 +++++++------- .../reactive/ServletServerHttpRequest.java | 2 +- .../reactive/UndertowServerHttpRequest.java | 2 +- .../AbstractListenerWebSocketSession.java | 17 +++- 6 files changed, 126 insertions(+), 95 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 55b6ac9a5d..5be7ea2bb9 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -69,9 +69,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override public void subscribe(Subscriber subscriber) { - if (this.logger.isTraceEnabled()) { - this.logger.trace(this.state + " subscribe: " + subscriber); - } this.state.get().subscribe(this, subscriber); } @@ -79,24 +76,20 @@ public abstract class AbstractListenerReadPublisher implements Publisher { // Methods for sub-classes to delegate to, when async I/O events occur... public final void onDataAvailable() { - if (this.logger.isTraceEnabled()) { - this.logger.trace(this.state + " onDataAvailable"); - } + this.logger.trace("I/O event onDataAvailable"); this.state.get().onDataAvailable(this); } public void onAllDataRead() { - if (this.logger.isTraceEnabled()) { - this.logger.trace(this.state + " onAllDataRead"); - } + this.logger.trace("I/O event onAllDataRead"); this.state.get().onAllDataRead(this); } - public final void onError(Throwable t) { + public final void onError(Throwable ex) { if (this.logger.isTraceEnabled()) { - this.logger.trace(this.state + " onError: " + t); + this.logger.trace("I/O event onError: " + ex); } - this.state.get().onError(this, t); + this.state.get().onError(this, ex); } @@ -142,10 +135,17 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (r != Long.MAX_VALUE) { DEMAND_FIELD_UPDATER.addAndGet(this, -1L); } - Assert.state(this.subscriber != null, "No subscriber"); - this.subscriber.onNext(data); + Subscriber subscriber = this.subscriber; + Assert.state(subscriber != null, "No subscriber"); + if (logger.isTraceEnabled()) { + logger.trace("Data item read, publishing.."); + } + subscriber.onNext(data); } else { + if (logger.isTraceEnabled()) { + logger.trace("No more data to read"); + } return true; } } @@ -153,7 +153,17 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } private boolean changeState(State oldState, State newState) { - return this.state.compareAndSet(oldState, newState); + boolean result = this.state.compareAndSet(oldState, newState); + if (result && logger.isTraceEnabled()) { + logger.trace(oldState + " -> " + newState); + } + return result; + } + + private void changeToDemandState(State oldState) { + if (changeState(oldState, State.DEMAND)) { + checkOnDataAvailable(); + } } private Subscription createSubscription() { @@ -170,7 +180,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override public final void request(long n) { if (logger.isTraceEnabled()) { - logger.trace(state + " request: " + n); + logger.trace("Signal request(" + n + ")"); } state.get().request(AbstractListenerReadPublisher.this, n); } @@ -178,7 +188,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override public final void cancel() { if (logger.isTraceEnabled()) { - logger.trace(state + " cancel"); + logger.trace("Signal cancel()"); } state.get().cancel(AbstractListenerReadPublisher.this); } @@ -217,10 +227,14 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.changeState(SUBSCRIBING, NO_DEMAND); // Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND if (publisher.completionBeforeDemand) { + publisher.logger.trace("Completed before demand"); publisher.state.get().onAllDataRead(publisher); } Throwable ex = publisher.errorBeforeDemand; if (ex != null) { + if (publisher.logger.isTraceEnabled()) { + publisher.logger.trace("Completed with error before demand: " + ex); + } publisher.state.get().onError(publisher, ex); } } @@ -249,9 +263,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); - if (publisher.changeState(this, DEMAND)) { - publisher.checkOnDataAvailable(); - } + publisher.changeToDemandState(this); } } @@ -271,10 +283,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); - if (publisher.changeState(this, DEMAND)) { - publisher.checkOnDataAvailable(); - } - // or else we completed at the same time... + publisher.changeToDemandState(this); } } }, @@ -285,9 +294,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); // Did a concurrent read transition to NO_DEMAND just before us? - if (publisher.changeState(NO_DEMAND, this)) { - publisher.checkOnDataAvailable(); - } + publisher.changeToDemandState(NO_DEMAND); } } @@ -297,17 +304,15 @@ public abstract class AbstractListenerReadPublisher implements Publisher { try { boolean demandAvailable = publisher.readAndPublish(); if (demandAvailable) { - if (publisher.changeState(READING, DEMAND)) { - publisher.checkOnDataAvailable(); - } + publisher.changeToDemandState(READING); } else { publisher.readingPaused(); if (publisher.changeState(READING, NO_DEMAND)) { // Demand may have arrived since readAndPublish returned long r = publisher.demand; - if (r > 0 && publisher.changeState(NO_DEMAND, this)) { - publisher.checkOnDataAvailable(); + if (r > 0) { + publisher.changeToDemandState(NO_DEMAND); } } } @@ -326,9 +331,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); // Did a concurrent read transition to NO_DEMAND just before us? - if (publisher.changeState(NO_DEMAND, DEMAND)) { - publisher.checkOnDataAvailable(); - } + publisher.changeToDemandState(NO_DEMAND); } } }, @@ -372,8 +375,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void onAllDataRead(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { - if (publisher.subscriber != null) { - publisher.subscriber.onComplete(); + Subscriber s = publisher.subscriber; + if (s != null) { + s.onComplete(); } } else { @@ -383,8 +387,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void onError(AbstractListenerReadPublisher publisher, Throwable t) { if (publisher.changeState(this, COMPLETED)) { - if (publisher.subscriber != null) { - publisher.subscriber.onError(t); + Subscriber s = publisher.subscriber; + if (s != null) { + s.onError(t); } } else { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 3d47ef8091..ae39da1986 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -53,40 +53,47 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo private final WriteResultPublisher resultPublisher = new WriteResultPublisher(); - // Subscriber methods... + // Subscriber methods and methods to notify of async I/O events... @Override public final void onSubscribe(Subscription subscription) { - if (logger.isTraceEnabled()) { - logger.trace(this.state + " onSubscribe: " + subscription); - } this.state.get().onSubscribe(this, subscription); } @Override public final void onNext(Publisher publisher) { - if (logger.isTraceEnabled()) { - logger.trace(this.state + " onNext: " + publisher); - } + logger.trace("Received onNext publisher"); this.state.get().onNext(this, publisher); } + /** + * Notify of an error. This can come from the upstream write Publisher or + * from sub-classes as a result of an I/O error. + */ @Override - public final void onError(Throwable t) { + public final void onError(Throwable ex) { if (logger.isTraceEnabled()) { - logger.trace(this.state + " onError: " + t); + logger.trace("Received onError: " + ex); } - this.state.get().onError(this, t); + this.state.get().onError(this, ex); } + /** + * Notify of completion. This can come from the upstream write Publisher or + * from sub-classes as a result of an I/O completion event. + */ @Override public final void onComplete() { - if (logger.isTraceEnabled()) { - logger.trace(this.state + " onComplete"); - } + logger.trace("Received onComplete"); this.state.get().onComplete(this); } + protected void cancel() { + this.logger.trace("Received request to cancel"); + if (this.subscription != null) { + this.subscription.cancel(); + } + } // Publisher method... @@ -96,15 +103,6 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } - // Methods for sub-classes to delegate to, when async I/O events occur... - - protected void cancel() { - if (this.subscription != null) { - this.subscription.cancel(); - } - } - - // Methods for sub-classes to implement or override... /** @@ -147,11 +145,19 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo // Private methods for use in State... private boolean changeState(State oldState, State newState) { - return this.state.compareAndSet(oldState, newState); + boolean result = this.state.compareAndSet(oldState, newState); + if (result && logger.isTraceEnabled()) { + logger.trace(oldState + " -> " + newState); + } + return result; } private void flushIfPossible() { - if (isWritePossible()) { + boolean result = isWritePossible(); + if (logger.isTraceEnabled()) { + logger.trace("isWritePossible[" + result + "]"); + } + if (result) { onFlushPossible(); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index e5041b1c1e..990dcea701 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -58,40 +58,52 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor " + newState); + } + return result; } private void changeStateToComplete(State oldState) { @@ -189,7 +192,11 @@ public abstract class AbstractListenerWriteProcessor implements Processor 0) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index fc22e01086..e6fc815073 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -170,7 +170,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { int read = this.channel.read(byteBuffer); if (logger.isTraceEnabled()) { - logger.trace("read:" + read); + logger.trace("Channel read returned " + read + (read != -1 ? " bytes" : "")); } if (read > 0) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 8b99dae89b..37300cc4f8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -224,8 +224,12 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc protected void checkOnDataAvailable() { resumeReceiving(); if (!this.pendingMessages.isEmpty()) { + logger.trace("checkOnDataAvailable, processing pending messages"); onDataAvailable(); } + else { + logger.trace("checkOnDataAvailable, no pending messages"); + } } @Override @@ -239,8 +243,11 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc return (WebSocketMessage) this.pendingMessages.poll(); } - void handleMessage(WebSocketMessage webSocketMessage) { - if (!this.pendingMessages.offer(webSocketMessage)) { + void handleMessage(WebSocketMessage message) { + if (logger.isTraceEnabled()) { + logger.trace("Received message: " + message); + } + if (!this.pendingMessages.offer(message)) { throw new IllegalStateException("Too many messages received. " + "Please ensure WebSocketSession.receive() is subscribed to."); } @@ -255,6 +262,9 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override protected boolean write(WebSocketMessage message) throws IOException { + if (logger.isTraceEnabled()) { + logger.trace("Sending message " + message); + } return sendMessage(message); } @@ -279,6 +289,9 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc * async completion callback into simple flow control. */ public void setReadyToSend(boolean ready) { + if (ready) { + logger.trace("Send succeeded, ready to send again"); + } this.isReady = ready; } }