diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java index c5d5bebacd..ffa21597dd 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java @@ -55,12 +55,12 @@ class RequestBodyPublisher implements Publisher { if (s == null) { throw SpecificationExceptions.spec_2_13_exception(); } - if (subscriber != null) { + if (this.subscriber != null) { s.onError(new IllegalStateException("Only one subscriber allowed")); } - subscriber = s; - subscriber.onSubscribe(new RequestBodySubscription()); + this.subscriber = s; + this.subscriber.onSubscribe(new RequestBodySubscription()); } private class RequestBodySubscription @@ -74,7 +74,7 @@ class RequestBodyPublisher implements Publisher { @Override public void cancel() { - subscriptionClosed = true; + this.subscriptionClosed = true; close(); } @@ -82,7 +82,7 @@ class RequestBodyPublisher implements Publisher { public void request(long n) { BackpressureUtils.checkRequest(n, subscriber); - if (subscriptionClosed) { + if (this.subscriptionClosed) { return; } @@ -96,13 +96,13 @@ class RequestBodyPublisher implements Publisher { } private void doOnNext(ByteBuffer buffer) { - draining = false; + this.draining = false; buffer.flip(); subscriber.onNext(buffer); } private void doOnComplete() { - subscriptionClosed = true; + this.subscriptionClosed = true; try { subscriber.onComplete(); } @@ -112,7 +112,7 @@ class RequestBodyPublisher implements Publisher { } private void doOnError(Throwable t) { - subscriptionClosed = true; + this.subscriptionClosed = true; try { subscriber.onError(t); } @@ -122,19 +122,19 @@ class RequestBodyPublisher implements Publisher { } private void close() { - if (pooledBuffer != null) { - safeClose(pooledBuffer); - pooledBuffer = null; + if (this.pooledBuffer != null) { + safeClose(this.pooledBuffer); + this.pooledBuffer = null; } - if (channel != null) { - safeClose(channel); - channel = null; + if (this.channel != null) { + safeClose(this.channel); + this.channel = null; } } @Override public void run() { - if (subscriptionClosed || draining) { + if (this.subscriptionClosed || this.draining) { return; } @@ -142,12 +142,12 @@ class RequestBodyPublisher implements Publisher { return; } - draining = true; + this.draining = true; - if (channel == null) { - channel = exchange.getRequestChannel(); + if (this.channel == null) { + this.channel = exchange.getRequestChannel(); - if (channel == null) { + if (this.channel == null) { if (exchange.isRequestComplete()) { return; } @@ -157,21 +157,21 @@ class RequestBodyPublisher implements Publisher { } } } - if (pooledBuffer == null) { - pooledBuffer = exchange.getConnection().getByteBufferPool().allocate(); + if (this.pooledBuffer == null) { + this.pooledBuffer = exchange.getConnection().getByteBufferPool().allocate(); } else { - pooledBuffer.getBuffer().clear(); + this.pooledBuffer.getBuffer().clear(); } try { - ByteBuffer buffer = pooledBuffer.getBuffer(); + ByteBuffer buffer = this.pooledBuffer.getBuffer(); int count; do { - count = channel.read(buffer); + count = this.channel.read(buffer); if (count == 0) { - channel.getReadSetter().set(this); - channel.resumeReads(); + this.channel.getReadSetter().set(this); + this.channel.resumeReads(); } else if (count == -1) { if (buffer.position() > 0) { @@ -181,11 +181,11 @@ class RequestBodyPublisher implements Publisher { } else { if (buffer.remaining() == 0) { - if (demand == 0) { - channel.suspendReads(); + if (this.demand == 0) { + this.channel.suspendReads(); } doOnNext(buffer); - if (demand > 0) { + if (this.demand > 0) { scheduleNextMessage(); } break; @@ -200,12 +200,12 @@ class RequestBodyPublisher implements Publisher { @Override public void handleEvent(StreamSourceChannel channel) { - if (subscriptionClosed) { + if (this.subscriptionClosed) { return; } try { - ByteBuffer buffer = pooledBuffer.getBuffer(); + ByteBuffer buffer = this.pooledBuffer.getBuffer(); int count; do { count = channel.read(buffer); @@ -220,11 +220,11 @@ class RequestBodyPublisher implements Publisher { } else { if (buffer.remaining() == 0) { - if (demand == 0) { + if (this.demand == 0) { channel.suspendReads(); } doOnNext(buffer); - if (demand > 0) { + if (this.demand > 0) { scheduleNextMessage(); } break; diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java index 666b8ca7c5..b87f126818 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java @@ -50,7 +50,7 @@ class RequestHandlerAdapter implements io.undertow.server.HttpHandler { new UndertowServerHttpResponse(exchange, responseBodySubscriber); exchange.dispatch(); - httpHandler.handle(request, response).subscribe(new Subscriber() { + this.httpHandler.handle(request, response).subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java index 06f7a76cea..ae42aa1a2d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java @@ -60,29 +60,29 @@ class ResponseBodySubscriber extends BaseSubscriber @Override public void onSubscribe(Subscription s) { super.onSubscribe(s); - subscription = s; - subscription.request(1); + this.subscription = s; + this.subscription.request(1); } @Override public void onNext(ByteBuffer buffer) { super.onNext(buffer); - if (responseChannel == null) { - responseChannel = exchange.getResponseChannel(); + if (this.responseChannel == null) { + this.responseChannel = this.exchange.getResponseChannel(); } - writing.incrementAndGet(); + this.writing.incrementAndGet(); try { int c; do { - c = responseChannel.write(buffer); + c = this.responseChannel.write(buffer); } while (buffer.hasRemaining() && c > 0); if (buffer.hasRemaining()) { - writing.incrementAndGet(); + this.writing.incrementAndGet(); enqueue(buffer); - responseChannel.getWriteSetter().set(this); - responseChannel.resumeWrites(); + this.responseChannel.getWriteSetter().set(this); + this.responseChannel.resumeWrites(); } else { this.subscription.request(1); @@ -93,8 +93,8 @@ class ResponseBodySubscriber extends BaseSubscriber onError(ex); } finally { - writing.decrementAndGet(); - if (closing.get()) { + this.writing.decrementAndGet(); + if (this.closing.get()) { closeIfDone(); } } @@ -103,12 +103,12 @@ class ResponseBodySubscriber extends BaseSubscriber private void enqueue(ByteBuffer src) { do { PooledByteBuffer pooledBuffer = - exchange.getConnection().getByteBufferPool().allocate(); + this.exchange.getConnection().getByteBufferPool().allocate(); ByteBuffer dst = pooledBuffer.getBuffer(); copy(dst, src); dst.flip(); - buffers.add(pooledBuffer); + this.buffers.add(pooledBuffer); } while (src.remaining() > 0); } @@ -124,25 +124,25 @@ class ResponseBodySubscriber extends BaseSubscriber try { int c; do { - ByteBuffer buffer = buffers.peek().getBuffer(); + ByteBuffer buffer = this.buffers.peek().getBuffer(); do { c = channel.write(buffer); } while (buffer.hasRemaining() && c > 0); if (!buffer.hasRemaining()) { - safeClose(buffers.remove()); + safeClose(this.buffers.remove()); } - } while (!buffers.isEmpty() && c > 0); - if (!buffers.isEmpty()) { + } while (!this.buffers.isEmpty() && c > 0); + if (!this.buffers.isEmpty()) { channel.resumeWrites(); } else { - writing.decrementAndGet(); + this.writing.decrementAndGet(); - if (closing.get()) { + if (this.closing.get()) { closeIfDone(); } else { - subscription.request(1); + this.subscription.request(1); } } } @@ -154,10 +154,10 @@ class ResponseBodySubscriber extends BaseSubscriber @Override public void onError(Throwable t) { super.onError(t); - if (!exchange.isResponseStarted() && - exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) { + if (!this.exchange.isResponseStarted() && + this.exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) { - exchange.setStatusCode(INTERNAL_SERVER_ERROR.value()); + this.exchange.setStatusCode(INTERNAL_SERVER_ERROR.value()); } logger.error("ResponseBodySubscriber error", t); } @@ -166,15 +166,15 @@ class ResponseBodySubscriber extends BaseSubscriber public void onComplete() { super.onComplete(); - if (responseChannel != null) { - closing.set(true); + if (this.responseChannel != null) { + this.closing.set(true); closeIfDone(); } } private void closeIfDone() { - if (writing.get() == 0) { - if (closing.compareAndSet(true, false)) { + if (this.writing.get() == 0) { + if (this.closing.compareAndSet(true, false)) { closeChannel(); } } @@ -182,16 +182,16 @@ class ResponseBodySubscriber extends BaseSubscriber private void closeChannel() { try { - responseChannel.shutdownWrites(); + this.responseChannel.shutdownWrites(); - if (!responseChannel.flush()) { - responseChannel.getWriteSetter().set( + if (!this.responseChannel.flush()) { + this.responseChannel.getWriteSetter().set( flushingChannelListener( - o -> safeClose(responseChannel), + o -> safeClose(this.responseChannel), closingChannelExceptionHandler())); - responseChannel.resumeWrites(); + this.responseChannel.resumeWrites(); } - responseChannel = null; + this.responseChannel = null; } catch (IOException ex) { onError(ex); diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java index 262ed92869..6a9a747c18 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java @@ -40,7 +40,7 @@ public class UndertowHttpServer extends HttpServerSupport HttpHandler handler = new RequestHandlerAdapter(getHttpHandler()); - undertowServer = Undertow.builder() + this.undertowServer = Undertow.builder() .addHttpListener(getPort() != -1 ? getPort() : 8080, "localhost") .setHandler(handler) .build(); @@ -49,7 +49,7 @@ public class UndertowHttpServer extends HttpServerSupport @Override public void start() { if (!running) { - undertowServer.start(); + this.undertowServer.start(); running = true; } @@ -58,7 +58,7 @@ public class UndertowHttpServer extends HttpServerSupport @Override public void stop() { if (running) { - undertowServer.stop(); + this.undertowServer.stop(); running = false; } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java index 32b5fa2b99..4e83006676 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java @@ -54,15 +54,15 @@ class UndertowServerHttpRequest implements ReactiveServerHttpRequest { @Override public HttpMethod getMethod() { - return HttpMethod.valueOf(exchange.getRequestMethod().toString()); + return HttpMethod.valueOf(this.exchange.getRequestMethod().toString()); } @Override public URI getURI() { try { - StringBuilder uri = new StringBuilder(exchange.getRequestPath()); - if (StringUtils.hasLength(exchange.getQueryString())) { - uri.append('?').append(exchange.getQueryString()); + StringBuilder uri = new StringBuilder(this.exchange.getRequestPath()); + if (StringUtils.hasLength(this.exchange.getQueryString())) { + uri.append('?').append(this.exchange.getQueryString()); } return new URI(uri.toString()); } @@ -75,7 +75,7 @@ class UndertowServerHttpRequest implements ReactiveServerHttpRequest { public HttpHeaders getHeaders() { if (this.headers == null) { this.headers = new HttpHeaders(); - for (HeaderValues headerValues : exchange.getRequestHeaders()) { + for (HeaderValues headerValues : this.exchange.getRequestHeaders()) { for (String value : headerValues) { this.headers.add(headerValues.getHeaderName().toString(), value); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java index 3c68dc41e5..b3bc0fefe9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java @@ -51,7 +51,7 @@ class UndertowServerHttpResponse implements ReactiveServerHttpResponse { @Override public void setStatusCode(HttpStatus status) { - exchange.setStatusCode(status.value()); + this.exchange.setStatusCode(status.value()); } @Override @@ -103,7 +103,7 @@ class UndertowServerHttpResponse implements ReactiveServerHttpResponse { if (!this.headersWritten) { for (Map.Entry> entry : this.headers.entrySet()) { String headerName = entry.getKey(); - exchange.getResponseHeaders() + this.exchange.getResponseHeaders() .addAll(HttpString.tryFromString(headerName), entry.getValue()); }