From 0a60c622cc2f21e1ed3e2cb4fdb25a1ffecb1ffd Mon Sep 17 00:00:00 2001 From: gregw Date: Mon, 15 Jan 2024 19:05:09 +1100 Subject: [PATCH] Implement Eclipse Jetty core HTTP handler adapter This provides an implementation of an HTTP Handler Adapter that is coded directly to the Eclipse Jetty core API, bypassing any servlet implementation. This includes a Jetty implementation of the spring `WebSocketClient` interface, `JettyWebSocketClient`, using an explicit dependency to the jetty-websocket-api. Closes gh-32097 Co-authored-by: Lachlan Roberts Co-authored-by: Arjen Poutsma --- framework-platform/framework-platform.gradle | 4 +- spring-core/spring-core.gradle | 1 + .../core/io/buffer/DefaultDataBuffer.java | 2 +- .../core/io/buffer/JettyDataBuffer.java | 359 ++++++++++++++++++ .../io/buffer/JettyDataBufferFactory.java | 108 ++++++ .../core/io/buffer/JettyDataBufferTests.java | 59 +++ .../core/io/buffer/PooledDataBufferTests.java | 13 + spring-web/spring-web.gradle | 1 + .../reactive/JettyClientHttpConnector.java | 296 +-------------- .../http/codec/multipart/MultipartParser.java | 5 +- .../DefaultServerHttpRequestBuilder.java | 4 +- .../reactive/JettyCoreHttpHandlerAdapter.java | 60 +++ .../reactive/JettyCoreServerHttpRequest.java | 120 ++++++ .../reactive/JettyCoreServerHttpResponse.java | 237 ++++++++++++ .../http/support/JettyHeadersAdapter.java | 73 +++- .../ErrorHandlerIntegrationTests.java | 5 +- .../reactive/ZeroCopyIntegrationTests.java | 5 +- .../AbstractHttpHandlerIntegrationTests.java | 1 + .../bootstrap/JettyCoreHttpServer.java | 98 +++++ .../reactive/bootstrap/JettyHttpServer.java | 27 +- spring-webflux/spring-webflux.gradle | 2 + .../adapter/JettyWebSocketHandlerAdapter.java | 93 ++--- .../socket/adapter/JettyWebSocketSession.java | 236 +++++++++--- .../socket/client/JettyWebSocketClient.java | 111 ++++++ .../support/HandshakeWebSocketService.java | 8 + .../JettyCoreRequestUpgradeStrategy.java | 127 +++++++ .../upgrade/JettyRequestUpgradeStrategy.java | 2 +- .../ContextPathIntegrationTests.java | 27 +- .../annotation/SseIntegrationTests.java | 30 +- ...ractReactiveWebSocketIntegrationTests.java | 13 + spring-websocket/spring-websocket.gradle | 1 + .../jetty/JettyWebSocketHandlerAdapter.java | 97 +++-- .../jetty/JettyRequestUpgradeStrategy.java | 2 +- 33 files changed, 1701 insertions(+), 526 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBufferFactory.java create mode 100644 spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java create mode 100644 spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java create mode 100644 spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java create mode 100644 spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java create mode 100644 spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyCoreHttpServer.java create mode 100644 spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java create mode 100644 spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyCoreRequestUpgradeStrategy.java diff --git a/framework-platform/framework-platform.gradle b/framework-platform/framework-platform.gradle index 6754696287..36e9c31c55 100644 --- a/framework-platform/framework-platform.gradle +++ b/framework-platform/framework-platform.gradle @@ -16,8 +16,8 @@ dependencies { api(platform("org.apache.groovy:groovy-bom:4.0.21")) api(platform("org.apache.logging.log4j:log4j-bom:2.21.1")) api(platform("org.assertj:assertj-bom:3.26.0")) - api(platform("org.eclipse.jetty:jetty-bom:12.0.10")) - api(platform("org.eclipse.jetty.ee10:jetty-ee10-bom:12.0.10")) + api(platform("org.eclipse.jetty:jetty-bom:12.0.11")) + api(platform("org.eclipse.jetty.ee10:jetty-ee10-bom:12.0.11")) api(platform("org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.7.3")) api(platform("org.jetbrains.kotlinx:kotlinx-serialization-bom:1.6.0")) api(platform("org.junit:junit-bom:5.10.3")) diff --git a/spring-core/spring-core.gradle b/spring-core/spring-core.gradle index fedd203d55..91710c31c3 100644 --- a/spring-core/spring-core.gradle +++ b/spring-core/spring-core.gradle @@ -81,6 +81,7 @@ dependencies { optional("io.smallrye.reactive:mutiny") optional("net.sf.jopt-simple:jopt-simple") optional("org.aspectj:aspectjweaver") + optional("org.eclipse.jetty:jetty-io") optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-stdlib") optional("org.jetbrains.kotlinx:kotlinx-coroutines-core") diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java index 16b444dfb4..d9d43da4ee 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java @@ -355,7 +355,7 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public DataBuffer split(int index) { + public DefaultDataBuffer split(int index) { checkIndex(index); ByteBuffer split = this.byteBuffer.duplicate().clear() diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java new file mode 100644 index 0000000000..f2eb530631 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java @@ -0,0 +1,359 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntPredicate; + +import org.eclipse.jetty.io.Content; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Implementation of the {@code DataBuffer} interface that can wrap a Jetty + * {@link Content.Chunk}. Typically constructed with {@link JettyDataBufferFactory}. + * + * @author Greg Wilkins + * @author Lachlan Roberts + * @author Arjen Poutsma + * @since 6.2 + */ +public final class JettyDataBuffer implements PooledDataBuffer { + + private final DefaultDataBuffer delegate; + + @Nullable + private final Content.Chunk chunk; + + private final JettyDataBufferFactory bufferFactory; + + private final AtomicInteger refCount = new AtomicInteger(1); + + + JettyDataBuffer(JettyDataBufferFactory bufferFactory, DefaultDataBuffer delegate, Content.Chunk chunk) { + Assert.notNull(bufferFactory, "BufferFactory must not be null"); + Assert.notNull(delegate, "Delegate must not be null"); + Assert.notNull(chunk, "Chunk must not be null"); + + this.bufferFactory = bufferFactory; + this.delegate = delegate; + this.chunk = chunk; + this.chunk.retain(); + } + + JettyDataBuffer(JettyDataBufferFactory bufferFactory, DefaultDataBuffer delegate) { + Assert.notNull(bufferFactory, "BufferFactory must not be null"); + Assert.notNull(delegate, "Delegate must not be null"); + + this.bufferFactory = bufferFactory; + this.delegate = delegate; + this.chunk = null; + } + + @Override + public boolean isAllocated() { + return this.refCount.get() > 0; + } + + @Override + public PooledDataBuffer retain() { + int result = this.refCount.updateAndGet(c -> { + if (c != 0) { + return c + 1; + } + else { + return 0; + } + }); + if (result != 0 && this.chunk != null) { + this.chunk.retain(); + } + return this; + } + + @Override + public PooledDataBuffer touch(Object hint) { + return this; + } + + @Override + public boolean release() { + int result = this.refCount.updateAndGet(c -> { + if (c != 0) { + return c - 1; + } + else { + throw new IllegalStateException("JettyDataBuffer already released: " + this); + } + }); + if (this.chunk != null) { + return this.chunk.release(); + } + else { + return result == 0; + } + } + + @Override + public DataBufferFactory factory() { + return this.bufferFactory; + } + + // delegation + + @Override + public int indexOf(IntPredicate predicate, int fromIndex) { + return this.delegate.indexOf(predicate, fromIndex); + } + + @Override + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + return this.delegate.lastIndexOf(predicate, fromIndex); + } + + @Override + public int readableByteCount() { + return this.delegate.readableByteCount(); + } + + @Override + public int writableByteCount() { + return this.delegate.writableByteCount(); + } + + @Override + public int capacity() { + return this.delegate.capacity(); + } + + @Override + @Deprecated + public DataBuffer capacity(int capacity) { + this.delegate.capacity(capacity); + return this; + } + + @Override + public DataBuffer ensureWritable(int capacity) { + this.delegate.ensureWritable(capacity); + return this; + } + + @Override + public int readPosition() { + return this.delegate.readPosition(); + } + + @Override + public DataBuffer readPosition(int readPosition) { + this.delegate.readPosition(readPosition); + return this; + } + + @Override + public int writePosition() { + return this.delegate.writePosition(); + } + + @Override + public DataBuffer writePosition(int writePosition) { + this.delegate.writePosition(writePosition); + return this; + } + + @Override + public byte getByte(int index) { + return this.delegate.getByte(index); + } + + @Override + public byte read() { + return this.delegate.read(); + } + + @Override + public DataBuffer read(byte[] destination) { + this.delegate.read(destination); + return this; + } + + @Override + public DataBuffer read(byte[] destination, int offset, int length) { + this.delegate.read(destination, offset, length); + return this; + } + + @Override + public DataBuffer write(byte b) { + this.delegate.write(b); + return this; + } + + @Override + public DataBuffer write(byte[] source) { + this.delegate.write(source); + return this; + } + + @Override + public DataBuffer write(byte[] source, int offset, int length) { + this.delegate.write(source, offset, length); + return this; + } + + @Override + public DataBuffer write(DataBuffer... buffers) { + this.delegate.write(buffers); + return this; + } + + @Override + public DataBuffer write(ByteBuffer... buffers) { + this.delegate.write(buffers); + return this; + } + + @Override + @Deprecated + public DataBuffer slice(int index, int length) { + DefaultDataBuffer delegateSlice = this.delegate.slice(index, length); + if (this.chunk != null) { + this.chunk.retain(); + return new JettyDataBuffer(this.bufferFactory, delegateSlice, this.chunk); + } + else { + return new JettyDataBuffer(this.bufferFactory, delegateSlice); + } + } + + @Override + public DataBuffer split(int index) { + DefaultDataBuffer delegateSplit = this.delegate.split(index); + if (this.chunk != null) { + this.chunk.retain(); + return new JettyDataBuffer(this.bufferFactory, delegateSplit, this.chunk); + } + else { + return new JettyDataBuffer(this.bufferFactory, delegateSplit); + } + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer() { + return this.delegate.asByteBuffer(); + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer(int index, int length) { + return this.delegate.asByteBuffer(index, length); + } + + @Override + @Deprecated + public ByteBuffer toByteBuffer(int index, int length) { + return this.delegate.toByteBuffer(index, length); + } + + @Override + public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { + this.delegate.toByteBuffer(srcPos, dest, destPos, length); + } + + @Override + public ByteBufferIterator readableByteBuffers() { + ByteBufferIterator delegateIterator = this.delegate.readableByteBuffers(); + if (this.chunk != null) { + return new JettyByteBufferIterator(delegateIterator, this.chunk); + } + else { + return delegateIterator; + } + } + + @Override + public ByteBufferIterator writableByteBuffers() { + ByteBufferIterator delegateIterator = this.delegate.writableByteBuffers(); + if (this.chunk != null) { + return new JettyByteBufferIterator(delegateIterator, this.chunk); + } + else { + return delegateIterator; + } + } + + @Override + public String toString(int index, int length, Charset charset) { + return this.delegate.toString(index, length, charset); + } + + @Override + public int hashCode() { + return this.delegate.hashCode(); + } + + @Override + public boolean equals(Object o) { + return this == o || (o instanceof JettyDataBuffer other && + this.delegate.equals(other.delegate)); + } + + @Override + public String toString() { + return String.format("JettyDataBuffer (r: %d, w: %d, c: %d)", + readPosition(), writePosition(), capacity()); + } + + private static final class JettyByteBufferIterator implements ByteBufferIterator { + + private final ByteBufferIterator delegate; + + private final Content.Chunk chunk; + + + public JettyByteBufferIterator(ByteBufferIterator delegate, Content.Chunk chunk) { + Assert.notNull(delegate, "Delegate must not be null"); + Assert.notNull(chunk, "Chunk must not be null"); + + this.delegate = delegate; + this.chunk = chunk; + this.chunk.retain(); + } + + + @Override + public void close() { + this.delegate.close(); + this.chunk.release(); + } + + @Override + public boolean hasNext() { + return this.delegate.hasNext(); + } + + @Override + public ByteBuffer next() { + return this.delegate.next(); + } + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBufferFactory.java new file mode 100644 index 0000000000..7034a60f2a --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBufferFactory.java @@ -0,0 +1,108 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.eclipse.jetty.io.Content; + +/** + * Implementation of the {@code DataBufferFactory} interface that creates + * {@link JettyDataBuffer} instances. + * + * @author Arjen Poutsma + * @since 6.2 + */ +public class JettyDataBufferFactory implements DataBufferFactory { + + private final DefaultDataBufferFactory delegate; + + + /** + * Creates a new {@code JettyDataBufferFactory} with default settings. + */ + public JettyDataBufferFactory() { + this(false); + } + + /** + * Creates a new {@code JettyDataBufferFactory}, indicating whether direct + * buffers should be created by {@link #allocateBuffer()} and + * {@link #allocateBuffer(int)}. + * @param preferDirect {@code true} if direct buffers are to be preferred; + * {@code false} otherwise + */ + public JettyDataBufferFactory(boolean preferDirect) { + this(preferDirect, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY); + } + + /** + * Creates a new {@code JettyDataBufferFactory}, indicating whether direct + * buffers should be created by {@link #allocateBuffer()} and + * {@link #allocateBuffer(int)}, and what the capacity is to be used for + * {@link #allocateBuffer()}. + * @param preferDirect {@code true} if direct buffers are to be preferred; + * {@code false} otherwise + */ + public JettyDataBufferFactory(boolean preferDirect, int defaultInitialCapacity) { + this.delegate = new DefaultDataBufferFactory(preferDirect, defaultInitialCapacity); + } + + + @Override + @Deprecated + public JettyDataBuffer allocateBuffer() { + DefaultDataBuffer delegate = this.delegate.allocateBuffer(); + return new JettyDataBuffer(this, delegate); + } + + @Override + public JettyDataBuffer allocateBuffer(int initialCapacity) { + DefaultDataBuffer delegate = this.delegate.allocateBuffer(initialCapacity); + return new JettyDataBuffer(this, delegate); + } + + @Override + public JettyDataBuffer wrap(ByteBuffer byteBuffer) { + DefaultDataBuffer delegate = this.delegate.wrap(byteBuffer); + return new JettyDataBuffer(this, delegate); + } + + @Override + public JettyDataBuffer wrap(byte[] bytes) { + DefaultDataBuffer delegate = this.delegate.wrap(bytes); + return new JettyDataBuffer(this, delegate); + } + + public JettyDataBuffer wrap(Content.Chunk chunk) { + ByteBuffer byteBuffer = chunk.getByteBuffer(); + DefaultDataBuffer delegate = this.delegate.wrap(byteBuffer); + return new JettyDataBuffer(this, delegate, chunk); + } + + @Override + public JettyDataBuffer join(List dataBuffers) { + DefaultDataBuffer delegate = this.delegate.join(dataBuffers); + return new JettyDataBuffer(this, delegate); + } + + @Override + public boolean isDirect() { + return this.delegate.isDirect(); + } +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java new file mode 100644 index 0000000000..456338c109 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java @@ -0,0 +1,59 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.io.Content; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +/** + * @author Arjen Poutsma + */ +public class JettyDataBufferTests { + + private final JettyDataBufferFactory dataBufferFactory = new JettyDataBufferFactory(); + + @Test + void releaseRetainChunk() { + ByteBuffer buffer = ByteBuffer.allocate(3); + Content.Chunk mockChunk = mock(); + given(mockChunk.getByteBuffer()).willReturn(buffer); + given(mockChunk.release()).willReturn(false, false, true); + + + + JettyDataBuffer dataBuffer = this.dataBufferFactory.wrap(mockChunk); + dataBuffer.retain(); + dataBuffer.retain(); + assertThat(dataBuffer.release()).isFalse(); + assertThat(dataBuffer.release()).isFalse(); + assertThat(dataBuffer.release()).isTrue(); + + assertThatIllegalStateException().isThrownBy(dataBuffer::release); + + then(mockChunk).should(times(3)).retain(); + then(mockChunk).should(times(3)).release(); + } +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java index 5f353ad596..87843ca6b6 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java @@ -69,6 +69,15 @@ class PooledDataBufferTests { } } + @Nested + class Jetty implements PooledDataBufferTestingTrait { + + @Override + public DataBufferFactory createDataBufferFactory() { + return new JettyDataBufferFactory(); + } + } + interface PooledDataBufferTestingTrait { @@ -82,10 +91,14 @@ class PooledDataBufferTests { default void retainAndRelease() { PooledDataBuffer buffer = createDataBuffer(1); buffer.write((byte) 'a'); + assertThat(buffer.isAllocated()).isTrue(); buffer.retain(); + assertThat(buffer.isAllocated()).isTrue(); assertThat(buffer.release()).isFalse(); + assertThat(buffer.isAllocated()).isTrue(); assertThat(buffer.release()).isTrue(); + assertThat(buffer.isAllocated()).isFalse(); } @Test diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 03b4a0ff19..7585e176b9 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -72,6 +72,7 @@ dependencies { because("needed by Netty's SelfSignedCertificate on JDK 15+") } testFixturesImplementation("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") + testFixturesImplementation("org.eclipse.jetty.websocket:jetty-websocket-jetty-server") testImplementation(project(":spring-core-test")) testImplementation(testFixtures(project(":spring-beans"))) testImplementation(testFixtures(project(":spring-context"))) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java index a2895f6ded..071a161e90 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -17,24 +17,16 @@ package org.springframework.http.client.reactive; import java.net.URI; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.IntPredicate; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.Request; -import org.eclipse.jetty.io.Content; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; -import org.springframework.core.io.buffer.PooledDataBuffer; -import org.springframework.core.io.buffer.TouchableDataBuffer; +import org.springframework.core.io.buffer.JettyDataBufferFactory; import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -50,7 +42,7 @@ public class JettyClientHttpConnector implements ClientHttpConnector { private final HttpClient httpClient; - private DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance; + private JettyDataBufferFactory bufferFactory = new JettyDataBufferFactory(); /** @@ -103,7 +95,7 @@ public class JettyClientHttpConnector implements ClientHttpConnector { /** * Set the buffer factory to use. */ - public void setBufferFactory(DataBufferFactory bufferFactory) { + public void setBufferFactory(JettyDataBufferFactory bufferFactory) { this.bufferFactory = bufferFactory; } @@ -134,289 +126,9 @@ public class JettyClientHttpConnector implements ClientHttpConnector { private Mono execute(JettyClientHttpRequest request) { return Mono.fromDirect(request.toReactiveRequest() .response((reactiveResponse, chunkPublisher) -> { - Flux content = Flux.from(chunkPublisher).map(this::toDataBuffer); + Flux content = Flux.from(chunkPublisher).map(this.bufferFactory::wrap); return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); })); } - private DataBuffer toDataBuffer(Content.Chunk chunk) { - DataBuffer delegate = this.bufferFactory.wrap(chunk.getByteBuffer()); - return new JettyDataBuffer(delegate, chunk); - } - - - private static final class JettyDataBuffer implements PooledDataBuffer { - - private final DataBuffer delegate; - - private final Content.Chunk chunk; - - private final AtomicInteger refCount = new AtomicInteger(1); - - - public JettyDataBuffer(DataBuffer delegate, Content.Chunk chunk) { - Assert.notNull(delegate, "Delegate must not be null"); - Assert.notNull(chunk, "Chunk must not be null"); - - this.delegate = delegate; - this.chunk = chunk; - } - - @Override - public boolean isAllocated() { - return this.refCount.get() > 0; - } - - @Override - public PooledDataBuffer retain() { - if (this.delegate instanceof PooledDataBuffer pooledDelegate) { - pooledDelegate.retain(); - } - this.chunk.retain(); - this.refCount.getAndUpdate(c -> { - if (c != 0) { - return c + 1; - } - else { - return 0; - } - }); - return this; - } - - @Override - public boolean release() { - if (this.delegate instanceof PooledDataBuffer pooledDelegate) { - pooledDelegate.release(); - } - this.chunk.release(); - int refCount = this.refCount.updateAndGet(c -> { - if (c != 0) { - return c - 1; - } - else { - throw new IllegalStateException("already released " + this); - } - }); - return refCount == 0; - } - - @Override - public PooledDataBuffer touch(Object hint) { - if (this.delegate instanceof TouchableDataBuffer touchableDelegate) { - touchableDelegate.touch(hint); - } - return this; - } - - // delegation - - @Override - public DataBufferFactory factory() { - return this.delegate.factory(); - } - - @Override - public int indexOf(IntPredicate predicate, int fromIndex) { - return this.delegate.indexOf(predicate, fromIndex); - } - - @Override - public int lastIndexOf(IntPredicate predicate, int fromIndex) { - return this.delegate.lastIndexOf(predicate, fromIndex); - } - - @Override - public int readableByteCount() { - return this.delegate.readableByteCount(); - } - - @Override - public int writableByteCount() { - return this.delegate.writableByteCount(); - } - - @Override - public int capacity() { - return this.delegate.capacity(); - } - - @Override - @Deprecated - public DataBuffer capacity(int capacity) { - this.delegate.capacity(capacity); - return this; - } - - @Override - public DataBuffer ensureWritable(int capacity) { - this.delegate.ensureWritable(capacity); - return this; - } - - @Override - public int readPosition() { - return this.delegate.readPosition(); - } - - @Override - public DataBuffer readPosition(int readPosition) { - this.delegate.readPosition(readPosition); - return this; - } - - @Override - public int writePosition() { - return this.delegate.writePosition(); - } - - @Override - public DataBuffer writePosition(int writePosition) { - this.delegate.writePosition(writePosition); - return this; - } - - @Override - public byte getByte(int index) { - return this.delegate.getByte(index); - } - - @Override - public byte read() { - return this.delegate.read(); - } - - @Override - public DataBuffer read(byte[] destination) { - this.delegate.read(destination); - return this; - } - - @Override - public DataBuffer read(byte[] destination, int offset, int length) { - this.delegate.read(destination, offset, length); - return this; - } - - @Override - public DataBuffer write(byte b) { - this.delegate.write(b); - return this; - } - - @Override - public DataBuffer write(byte[] source) { - this.delegate.write(source); - return this; - } - - @Override - public DataBuffer write(byte[] source, int offset, int length) { - this.delegate.write(source, offset, length); - return this; - } - - @Override - public DataBuffer write(DataBuffer... buffers) { - this.delegate.write(buffers); - return this; - } - - @Override - public DataBuffer write(ByteBuffer... buffers) { - this.delegate.write(buffers); - return this; - } - - @Override - @Deprecated - public DataBuffer slice(int index, int length) { - DataBuffer delegateSlice = this.delegate.slice(index, length); - this.chunk.retain(); - return new JettyDataBuffer(delegateSlice, this.chunk); - } - - @Override - public DataBuffer split(int index) { - DataBuffer delegateSplit = this.delegate.split(index); - this.chunk.retain(); - return new JettyDataBuffer(delegateSplit, this.chunk); - } - - @Override - @Deprecated - public ByteBuffer asByteBuffer() { - return this.delegate.asByteBuffer(); - } - - @Override - @Deprecated - public ByteBuffer asByteBuffer(int index, int length) { - return this.delegate.asByteBuffer(index, length); - } - - @Override - @Deprecated - public ByteBuffer toByteBuffer(int index, int length) { - return this.delegate.toByteBuffer(index, length); - } - - @Override - public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { - this.delegate.toByteBuffer(srcPos, dest, destPos, length); - } - - @Override - public ByteBufferIterator readableByteBuffers() { - ByteBufferIterator delegateIterator = this.delegate.readableByteBuffers(); - return new JettyByteBufferIterator(delegateIterator, this.chunk); - } - - @Override - public ByteBufferIterator writableByteBuffers() { - ByteBufferIterator delegateIterator = this.delegate.writableByteBuffers(); - return new JettyByteBufferIterator(delegateIterator, this.chunk); - } - - @Override - public String toString(int index, int length, Charset charset) { - return this.delegate.toString(index, length, charset); - } - - - private static final class JettyByteBufferIterator implements ByteBufferIterator { - - private final ByteBufferIterator delegate; - - private final Content.Chunk chunk; - - - public JettyByteBufferIterator(ByteBufferIterator delegate, Content.Chunk chunk) { - Assert.notNull(delegate, "Delegate must not be null"); - Assert.notNull(chunk, "Chunk must not be null"); - - this.delegate = delegate; - this.chunk = chunk; - this.chunk.retain(); - } - - - @Override - public void close() { - this.delegate.close(); - this.chunk.release(); - } - - @Override - public boolean hasNext() { - return this.delegate.hasNext(); - } - - @Override - public ByteBuffer next() { - return this.delegate.next(); - } - } - } - } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java index 9eb8bff26a..f3112515ee 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java @@ -100,7 +100,6 @@ final class MultipartParser extends BaseSubscriber { return Flux.create(sink -> { MultipartParser parser = new MultipartParser(sink, boundary, maxHeadersSize, headersCharset); sink.onCancel(parser::onSinkCancel); - sink.onRequest(n -> parser.requestBuffer()); buffers.subscribe(parser); }); } @@ -112,7 +111,9 @@ final class MultipartParser extends BaseSubscriber { @Override protected void hookOnSubscribe(Subscription subscription) { - requestBuffer(); + if (this.sink.requestedFromDownstream() > 0) { + requestBuffer(); + } } @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java b/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java index a94e378e3c..59ae6eb3f5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java @@ -30,6 +30,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; @@ -70,7 +71,8 @@ class DefaultServerHttpRequestBuilder implements ServerHttpRequest.Builder { Assert.notNull(original, "ServerHttpRequest is required"); this.uri = original.getURI(); - this.headers = new HttpHeaders(original.getHeaders()); + // original headers can be immutable, so create a copy + this.headers = new HttpHeaders(new LinkedMultiValueMap<>(original.getHeaders())); this.httpMethod = original.getMethod(); this.contextPath = original.getPath().contextPath().value(); this.remoteAddress = original.getRemoteAddress(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java new file mode 100644 index 0000000000..08994a11f6 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java @@ -0,0 +1,60 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; + +import org.springframework.core.io.buffer.JettyDataBufferFactory; +import org.springframework.util.Assert; + +/** + * Adapt {@link HttpHandler} to the Jetty {@link org.eclipse.jetty.server.Handler} abstraction. + * + * @author Greg Wilkins + * @author Lachlan Roberts + * @author Arjen Poutsma + * @since 6.2 + */ +public class JettyCoreHttpHandlerAdapter extends Handler.Abstract.NonBlocking { + + private final HttpHandler httpHandler; + + private JettyDataBufferFactory dataBufferFactory = new JettyDataBufferFactory(); + + + public JettyCoreHttpHandlerAdapter(HttpHandler httpHandler) { + this.httpHandler = httpHandler; + } + + public void setDataBufferFactory(JettyDataBufferFactory dataBufferFactory) { + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null"); + this.dataBufferFactory = dataBufferFactory; + } + + + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception { + this.httpHandler.handle(new JettyCoreServerHttpRequest(request, this.dataBufferFactory), + new JettyCoreServerHttpResponse(response, this.dataBufferFactory)) + .subscribe(unused -> {}, callback::failed, callback::succeeded); + return true; + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java new file mode 100644 index 0000000000..d97832a2ff --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; + +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.Request; +import org.reactivestreams.FlowAdapters; +import reactor.core.publisher.Flux; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.JettyDataBufferFactory; +import org.springframework.http.HttpCookie; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.support.JettyHeadersAdapter; +import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * Adapt an Eclipse Jetty {@link Request} to a {@link org.springframework.http.server.ServerHttpRequest}. + * + * @author Greg Wilkins + * @author Arjen Poutsma + * @since 6.2 + */ +class JettyCoreServerHttpRequest extends AbstractServerHttpRequest { + + private final JettyDataBufferFactory dataBufferFactory; + + private final Request request; + + + public JettyCoreServerHttpRequest(Request request, JettyDataBufferFactory dataBufferFactory) { + super(HttpMethod.valueOf(request.getMethod()), + request.getHttpURI().toURI(), + request.getContext().getContextPath(), + new HttpHeaders(new JettyHeadersAdapter(request.getHeaders()))); + this.dataBufferFactory = dataBufferFactory; + this.request = request; + } + + @Override + protected MultiValueMap initCookies() { + List httpCookies = Request.getCookies(this.request); + if (httpCookies.isEmpty()) { + return CollectionUtils.toMultiValueMap(Collections.emptyMap()); + } + MultiValueMap cookies =new LinkedMultiValueMap<>(); + for (org.eclipse.jetty.http.HttpCookie c : httpCookies) { + cookies.add(c.getName(), new HttpCookie(c.getName(), c.getValue())); + } + return cookies; + } + + @Override + @Nullable + public SslInfo initSslInfo() { + if (this.request.getConnectionMetaData().isSecure() && + this.request.getAttribute(EndPoint.SslSessionData.ATTRIBUTE) instanceof EndPoint.SslSessionData sessionData) { + return new DefaultSslInfo(sessionData.sslSessionId(), sessionData.peerCertificates()); + } + return null; + } + + @SuppressWarnings("unchecked") + @Override + public T getNativeRequest() { + return (T) this.request; + } + + @Override + protected String initId() { + return this.request.getId(); + } + + @Override + @Nullable + public InetSocketAddress getLocalAddress() { + SocketAddress localAddress = this.request.getConnectionMetaData().getLocalSocketAddress(); + return localAddress instanceof InetSocketAddress inet ? inet : null; + } + + @Override + @Nullable + public InetSocketAddress getRemoteAddress() { + SocketAddress remoteAddress = this.request.getConnectionMetaData().getRemoteSocketAddress(); + return remoteAddress instanceof InetSocketAddress inet ? inet : null; + } + + @Override + public Flux getBody() { + // We access the request body as a Flow.Publisher, which is wrapped as an org.reactivestreams.Publisher and + // then wrapped as a Flux. + return Flux.from(FlowAdapters.toPublisher(Content.Source.asPublisher(this.request))) + .map(this.dataBufferFactory::wrap); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java new file mode 100644 index 0000000000..fbf533ec05 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java @@ -0,0 +1,237 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +import org.eclipse.jetty.http.HttpCookie; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.server.HttpCookieUtils; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.JettyDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ResponseCookie; +import org.springframework.http.ZeroCopyHttpOutputMessage; +import org.springframework.http.support.JettyHeadersAdapter; +import org.springframework.lang.Nullable; + +/** + * Adapt an Eclipse Jetty {@link Response} to a {@link org.springframework.http.server.ServerHttpResponse}. + * + * @author Greg Wilkins + * @author Lachlan Roberts + * @since 6.2 + */ +class JettyCoreServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage { + + private final Response response; + + public JettyCoreServerHttpResponse(Response response, JettyDataBufferFactory dataBufferFactory) { + super(dataBufferFactory, new HttpHeaders(new JettyHeadersAdapter(response.getHeaders()))); + this.response = response; + + // remove all existing cookies from the response and add them to the cookie map, to be added back later + for (ListIterator i = this.response.getHeaders().listIterator(); i.hasNext(); ) { + HttpField f = i.next(); + if (f instanceof HttpCookieUtils.SetCookieHttpField setCookieHttpField) { + HttpCookie httpCookie = setCookieHttpField.getHttpCookie(); + ResponseCookie responseCookie = ResponseCookie.from(httpCookie.getName(), httpCookie.getValue()) + .httpOnly(httpCookie.isHttpOnly()) + .domain(httpCookie.getDomain()) + .maxAge(httpCookie.getMaxAge()) + .sameSite(httpCookie.getSameSite().name()) + .secure(httpCookie.isSecure()) + .partitioned(httpCookie.isPartitioned()) + .build(); + this.addCookie(responseCookie); + i.remove(); + } + } + } + + @Override + protected Mono writeWithInternal(Publisher body) { + return Flux.from(body) + .concatMap(this::sendDataBuffer) + .then(); + } + + @Override + protected Mono writeAndFlushWithInternal(Publisher> body) { + return Flux.from(body).concatMap(this::writeWithInternal).then(); + } + + @Override + protected void applyStatusCode() { + HttpStatusCode status = getStatusCode(); + this.response.setStatus(status == null ? 0 : status.value()); + } + + @Override + protected void applyHeaders() { + } + + @Override + protected void applyCookies() { + this.getCookies().values().stream() + .flatMap(List::stream) + .forEach(cookie -> Response.addCookie(this.response, new ResponseHttpCookie(cookie))); + } + + @Override + public Mono writeWith(Path file, long position, long count) { + Callback.Completable callback = new Callback.Completable(); + Mono mono = Mono.fromFuture(callback); + try { + Content.copy(Content.Source.from(null, file, position, count), this.response, callback); + } + catch (Throwable th) { + callback.failed(th); + } + return doCommit(() -> mono); + } + + private Mono sendDataBuffer(DataBuffer dataBuffer) { + return Mono.defer(() -> { + DataBuffer.ByteBufferIterator byteBufferIterator = dataBuffer.readableByteBuffers(); + Callback.Completable callback = new Callback.Completable(); + new IteratingCallback() { + @Override + protected Action process() { + if (!byteBufferIterator.hasNext()) { + return Action.SUCCEEDED; + } + response.write(false, byteBufferIterator.next(), this); + return Action.SCHEDULED; + } + + @Override + protected void onCompleteSuccess() { + byteBufferIterator.close(); + DataBufferUtils.release(dataBuffer); + callback.complete(null); + } + + @Override + protected void onCompleteFailure(Throwable cause) { + byteBufferIterator.close(); + DataBufferUtils.release(dataBuffer); + callback.failed(cause); + } + }.iterate(); + + return Mono.fromFuture(callback); + }); + } + + @SuppressWarnings("unchecked") + @Override + public T getNativeResponse() { + return (T) this.response; + } + + private static class ResponseHttpCookie implements org.eclipse.jetty.http.HttpCookie { + private final ResponseCookie responseCookie; + + public ResponseHttpCookie(ResponseCookie responseCookie) { + this.responseCookie = responseCookie; + } + + public ResponseCookie getResponseCookie() { + return this.responseCookie; + } + + @Override + public String getName() { + return this.responseCookie.getName(); + } + + @Override + public String getValue() { + return this.responseCookie.getValue(); + } + + @Override + public int getVersion() { + return 0; + } + + @Override + public long getMaxAge() { + return this.responseCookie.getMaxAge().toSeconds(); + } + + @Override + @Nullable + public String getComment() { + return null; + } + + @Override + @Nullable + public String getDomain() { + return this.responseCookie.getDomain(); + } + + @Override + @Nullable + public String getPath() { + return this.responseCookie.getPath(); + } + + @Override + public boolean isSecure() { + return this.responseCookie.isSecure(); + } + + @Nullable + @Override + public SameSite getSameSite() { + // Adding non-null return site breaks tests. + return null; + } + + @Override + public boolean isHttpOnly() { + return this.responseCookie.isHttpOnly(); + } + + @Override + public boolean isPartitioned() { + return this.responseCookie.isPartitioned(); + } + + @Override + public Map getAttributes() { + return Collections.emptyMap(); + } + } +} diff --git a/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java b/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java index 57f9b7ab89..83c7b9fff4 100644 --- a/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java @@ -17,9 +17,11 @@ package org.springframework.http.support; import java.util.AbstractSet; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; @@ -44,6 +46,9 @@ public final class JettyHeadersAdapter implements MultiValueMap private final HttpFields headers; + @Nullable + private final HttpFields.Mutable mutable; + /** * Creates a new {@code JettyHeadersAdapter} based on the given @@ -53,6 +58,7 @@ public final class JettyHeadersAdapter implements MultiValueMap public JettyHeadersAdapter(HttpFields headers) { Assert.notNull(headers, "Headers must not be null"); this.headers = headers; + this.mutable = headers instanceof HttpFields.Mutable m ? m : null; } @@ -119,22 +125,36 @@ public final class JettyHeadersAdapter implements MultiValueMap @Override public boolean containsKey(Object key) { - return (key instanceof String headerName && this.headers.contains(headerName)); + return (key instanceof String name && this.headers.contains(name)); } @Override public boolean containsValue(Object value) { - return (value instanceof String searchString && - this.headers.stream().anyMatch(field -> field.contains(searchString))); + if (value instanceof String searchString) { + for (HttpField field : this.headers) { + if (field.contains(searchString)) { + return true; + } + } + } + return false; } @Nullable @Override public List get(Object key) { - if (containsKey(key)) { - return this.headers.getValuesList((String) key); + List list = null; + if (key instanceof String name) { + for (HttpField f : this.headers) { + if (f.is(name)) { + if (list == null) { + list = new ArrayList<>(); + } + list.add(f.getValue()); + } + } } - return null; + return list; } @Nullable @@ -142,7 +162,21 @@ public final class JettyHeadersAdapter implements MultiValueMap public List put(String key, List value) { HttpFields.Mutable mutableHttpFields = mutableFields(); List oldValues = get(key); - mutableHttpFields.put(key, value); + + if (oldValues == null) { + switch (value.size()) { + case 0 -> {} + case 1 -> mutableHttpFields.add(key, value.get(0)); + default -> mutableHttpFields.add(key, value); + } + } + else { + switch (value.size()) { + case 0 -> mutableHttpFields.remove(key); + case 1 -> mutableHttpFields.put(key, value.get(0)); + default -> mutableHttpFields.put(key, value); + } + } return oldValues; } @@ -150,12 +184,20 @@ public final class JettyHeadersAdapter implements MultiValueMap @Override public List remove(Object key) { HttpFields.Mutable mutableHttpFields = mutableFields(); + List list = null; if (key instanceof String name) { - List oldValues = get(key); - mutableHttpFields.remove(name); - return oldValues; + for (ListIterator i = mutableHttpFields.listIterator(); i.hasNext(); ) { + HttpField f = i.next(); + if (f.is(name)) { + if (list == null) { + list = new ArrayList<>(); + } + list.add(f.getValue()); + i.remove(); + } + } } - return null; + return list; } @Override @@ -187,6 +229,7 @@ public final class JettyHeadersAdapter implements MultiValueMap public Iterator>> iterator() { return new EntryIterator(); } + @Override public int size() { return headers.size(); @@ -195,16 +238,12 @@ public final class JettyHeadersAdapter implements MultiValueMap } private HttpFields.Mutable mutableFields() { - if (this.headers instanceof HttpFields.Mutable mutableHttpFields) { - return mutableHttpFields; - } - else { + if (this.mutable == null) { throw new IllegalStateException("Immutable headers"); } + return this.mutable; } - - @Override public String toString() { return HttpHeaders.formatHeaders(this); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java index eeb1bda99a..145d6a8ced 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java @@ -27,6 +27,7 @@ import org.springframework.web.client.ResponseErrorHandler; import org.springframework.web.client.RestTemplate; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import static org.assertj.core.api.Assertions.assertThat; @@ -87,8 +88,8 @@ class ErrorHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests { // but an application can apply CompactPathRule via RewriteHandler: // https://www.eclipse.org/jetty/documentation/jetty-11/programming_guide.php - HttpStatus expectedStatus = - (httpServer instanceof JettyHttpServer ? HttpStatus.BAD_REQUEST : HttpStatus.OK); + HttpStatus expectedStatus = (httpServer instanceof JettyHttpServer || httpServer instanceof JettyCoreHttpServer + ? HttpStatus.BAD_REQUEST : HttpStatus.OK); assertThat(response.getStatusCode()).isEqualTo(expectedStatus); } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java index 587825b0b0..eb243953e0 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java @@ -30,6 +30,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage; import org.springframework.web.client.RestTemplate; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer; @@ -54,8 +55,8 @@ class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTests { @ParameterizedHttpServerTest void zeroCopy(HttpServer httpServer) throws Exception { - assumeTrue(httpServer instanceof ReactorHttpServer || httpServer instanceof UndertowHttpServer, - "Zero-copy does not support Servlet"); + assumeTrue(httpServer instanceof ReactorHttpServer || httpServer instanceof UndertowHttpServer + || httpServer instanceof JettyCoreHttpServer, "Zero-copy does not support Servlet"); startServer(httpServer); diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java index f49b91fec5..0aa39aa025 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java @@ -126,6 +126,7 @@ public abstract class AbstractHttpHandlerIntegrationTests { static Stream> httpServers() { return Stream.of( named("Jetty", new JettyHttpServer()), + named("Jetty Core", new JettyCoreHttpServer()), named("Reactor Netty", new ReactorHttpServer()), named("Tomcat", new TomcatHttpServer()), named("Undertow", new UndertowHttpServer()) diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyCoreHttpServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyCoreHttpServer.java new file mode 100644 index 0000000000..488e0e429e --- /dev/null +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyCoreHttpServer.java @@ -0,0 +1,98 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.testfixture.http.server.reactive.bootstrap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.websocket.server.ServerWebSocketContainer; + +import org.springframework.http.server.reactive.JettyCoreHttpHandlerAdapter; + +/** + * @author Rossen Stoyanchev + * @author Sam Brannen + * @author Greg Wilkins + * @since 6.2 + */ +public class JettyCoreHttpServer extends AbstractHttpServer { + + protected Log logger = LogFactory.getLog(getClass().getName()); + + private ArrayByteBufferPool byteBufferPool; + + private Server jettyServer; + + @Override + protected void initServer() { + if (logger.isTraceEnabled()) + this.byteBufferPool = new ArrayByteBufferPool.Tracking(); + this.jettyServer = new Server(null, null, byteBufferPool); + + ServerConnector connector = new ServerConnector(this.jettyServer); + connector.setHost(getHost()); + connector.setPort(getPort()); + this.jettyServer.addConnector(connector); + this.jettyServer.setHandler(createHandlerAdapter()); + + ServerWebSocketContainer.ensure(jettyServer); + } + + private JettyCoreHttpHandlerAdapter createHandlerAdapter() { + return new JettyCoreHttpHandlerAdapter(resolveHttpHandler()); + } + + @Override + protected void startInternal() throws Exception { + this.jettyServer.start(); + setPort(((ServerConnector) this.jettyServer.getConnectors()[0]).getLocalPort()); + } + + @Override + protected void stopInternal() { + boolean wasRunning = this.jettyServer.isRunning(); + try { + this.jettyServer.stop(); + } + catch (Exception ex) { + // ignore + } + + // TODO remove this or make debug only + if (wasRunning && this.byteBufferPool instanceof ArrayByteBufferPool.Tracking tracking) { + if (!tracking.getLeaks().isEmpty()) { + System.err.println("Leaks:\n" + tracking.dumpLeaks()); + throw new IllegalStateException("LEAKS"); + } + } + } + + @Override + protected void resetInternal() { + try { + if (this.jettyServer.isRunning()) { + stopInternal(); + } + this.jettyServer.destroy(); + } + finally { + this.jettyServer = null; + } + } +} diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java index d3912ac2df..12878528ff 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java @@ -54,7 +54,6 @@ public class JettyHttpServer extends AbstractHttpServer { connector.setPort(getPort()); this.jettyServer.addConnector(connector); this.jettyServer.setHandler(this.contextHandler); - this.contextHandler.start(); } private ServletHttpHandlerAdapter createServletAdapter() { @@ -70,24 +69,10 @@ public class JettyHttpServer extends AbstractHttpServer { @Override protected void stopInternal() throws Exception { try { - if (this.contextHandler.isRunning()) { - this.contextHandler.stop(); - } + this.jettyServer.stop(); } - finally { - try { - if (this.jettyServer.isRunning()) { - // Do not configure a large stop timeout. For example, setting a stop timeout - // of 5000 adds an additional 1-2 seconds to the runtime of each test using - // the Jetty sever, resulting in 2-4 extra minutes of overall build time. - this.jettyServer.setStopTimeout(100); - this.jettyServer.stop(); - this.jettyServer.destroy(); - } - } - catch (Exception ex) { - // ignore - } + catch (Exception ex) { + // ignore } } @@ -95,18 +80,14 @@ public class JettyHttpServer extends AbstractHttpServer { protected void resetInternal() { try { if (this.jettyServer.isRunning()) { - // Do not configure a large stop timeout. For example, setting a stop timeout - // of 5000 adds an additional 1-2 seconds to the runtime of each test using - // the Jetty sever, resulting in 2-4 extra minutes of overall build time. - this.jettyServer.setStopTimeout(100); this.jettyServer.stop(); - this.jettyServer.destroy(); } } catch (Exception ex) { throw new IllegalStateException(ex); } finally { + this.jettyServer.destroy(); this.jettyServer = null; this.contextHandler = null; } diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 29b7021f03..93c7ee5b52 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -27,6 +27,8 @@ dependencies { optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") { exclude group: "jakarta.servlet", module: "jakarta.servlet-api" } + optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-server") + optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-client") optional("org.freemarker:freemarker") optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-stdlib") diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 71c0a2c840..1e5ee65e0d 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -22,16 +22,9 @@ import java.nio.charset.StandardCharsets; import java.util.function.Function; import java.util.function.IntPredicate; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.Callback; -import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.eclipse.jetty.websocket.core.OpCode; import org.springframework.core.io.buffer.CloseableDataBuffer; import org.springframework.core.io.buffer.DataBuffer; @@ -44,18 +37,14 @@ import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage.Type; /** - * Jetty {@link WebSocket @WebSocket} handler that delegates events to a + * Jetty {@link org.eclipse.jetty.websocket.api.Session.Listener} handler that delegates events to a * reactive {@link WebSocketHandler} and its session. * * @author Violeta Georgieva * @author Rossen Stoyanchev * @since 5.0 */ -@WebSocket -public class JettyWebSocketHandlerAdapter { - - private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); - +public class JettyWebSocketHandlerAdapter implements Session.Listener { private final WebSocketHandler delegateHandler; @@ -74,70 +63,62 @@ public class JettyWebSocketHandlerAdapter { this.sessionFactory = sessionFactory; } - - @OnWebSocketOpen + @Override public void onWebSocketOpen(Session session) { - this.delegateSession = this.sessionFactory.apply(session); - this.delegateHandler.handle(this.delegateSession) + JettyWebSocketSession delegateSession = this.sessionFactory.apply(session); + this.delegateSession = delegateSession; + this.delegateHandler.handle(delegateSession) .checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]") - .subscribe(this.delegateSession); + .subscribe(unused -> {}, delegateSession::onHandlerError, delegateSession::onHandleComplete); } - @OnWebSocketMessage + @Override public void onWebSocketText(String message) { - if (this.delegateSession != null) { - byte[] bytes = message.getBytes(StandardCharsets.UTF_8); - DataBuffer buffer = this.delegateSession.bufferFactory().wrap(bytes); - WebSocketMessage webSocketMessage = new WebSocketMessage(Type.TEXT, buffer); - this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + byte[] bytes = message.getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = this.delegateSession.bufferFactory().wrap(bytes); + WebSocketMessage webSocketMessage = new WebSocketMessage(Type.TEXT, buffer); + this.delegateSession.handleMessage(webSocketMessage); } - @OnWebSocketMessage + @Override public void onWebSocketBinary(ByteBuffer byteBuffer, Callback callback) { - if (this.delegateSession != null) { - DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer); - buffer = new JettyDataBuffer(buffer, callback); - WebSocketMessage webSocketMessage = new WebSocketMessage(Type.BINARY, buffer); - this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer); + buffer = new JettyCallbackDataBuffer(buffer, callback); + WebSocketMessage webSocketMessage = new WebSocketMessage(Type.BINARY, buffer); + this.delegateSession.handleMessage(webSocketMessage); } - @OnWebSocketFrame - public void onWebSocketFrame(Frame frame, Callback callback) { - if (this.delegateSession != null) { - if (OpCode.PONG == frame.getOpCode()) { - ByteBuffer byteBuffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD); - DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer); - buffer = new JettyDataBuffer(buffer, callback); - WebSocketMessage webSocketMessage = new WebSocketMessage(Type.PONG, buffer); - this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); - } - } + @Override + public void onWebSocketPong(ByteBuffer payload) { + Assert.state(this.delegateSession != null, "No delegate session available"); + DataBuffer buffer = this.delegateSession.bufferFactory().wrap(BufferUtil.copy(payload)); + WebSocketMessage webSocketMessage = new WebSocketMessage(Type.PONG, buffer); + this.delegateSession.handleMessage(webSocketMessage); } - @OnWebSocketClose + @Override public void onWebSocketClose(int statusCode, String reason) { - if (this.delegateSession != null) { - this.delegateSession.handleClose(CloseStatus.create(statusCode, reason)); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + this.delegateSession.handleClose(CloseStatus.create(statusCode, reason)); } - @OnWebSocketError + @Override public void onWebSocketError(Throwable cause) { - if (this.delegateSession != null) { - this.delegateSession.handleError(cause); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + this.delegateSession.handleError(cause); } - private static final class JettyDataBuffer implements CloseableDataBuffer { + private static final class JettyCallbackDataBuffer implements CloseableDataBuffer { private final DataBuffer delegate; private final Callback callback; - public JettyDataBuffer(DataBuffer delegate, Callback callback) { + + public JettyCallbackDataBuffer(DataBuffer delegate, Callback callback) { Assert.notNull(delegate, "'delegate` must not be null"); Assert.notNull(callback, "Callback must not be null"); this.delegate = delegate; @@ -272,13 +253,13 @@ public class JettyWebSocketHandlerAdapter { @Deprecated public DataBuffer slice(int index, int length) { DataBuffer delegateSlice = this.delegate.slice(index, length); - return new JettyDataBuffer(delegateSlice, this.callback); + return new JettyCallbackDataBuffer(delegateSlice, this.callback); } @Override public DataBuffer split(int index) { DataBuffer delegateSplit = this.delegate.split(index); - return new JettyDataBuffer(delegateSplit, this.callback); + return new JettyCallbackDataBuffer(delegateSplit, this.callback); } @Override diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index d6adca7e92..35aeac20a8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -16,18 +16,26 @@ package org.springframework.web.reactive.socket.adapter; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.websocket.api.Callback; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.HandshakeInfo; @@ -36,13 +44,29 @@ import org.springframework.web.reactive.socket.WebSocketSession; /** * Spring {@link WebSocketSession} implementation that adapts to a Jetty - * WebSocket {@link org.eclipse.jetty.websocket.api.Session}. + * WebSocket {@link Session}. * * @author Violeta Georgieva * @author Rossen Stoyanchev * @since 5.0 */ -public class JettyWebSocketSession extends AbstractListenerWebSocketSession { +public class JettyWebSocketSession extends AbstractWebSocketSession { + + private final Flux flux; + + private final Sinks.One closeStatusSink = Sinks.one(); + + private final Lock lock = new ReentrantLock(); + + private long requested = 0; + + private boolean awaitingMessage = false; + + @Nullable + private FluxSink sink; + + @Nullable + private final Sinks.Empty handlerCompletionSink; public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { this(session, info, factory, null); @@ -51,52 +75,88 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession completionSink) { - super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionSink); - // TODO: suspend causes failures if invoked at this stage - // suspendReceiving(); - } + super(session, ObjectUtils.getIdentityHexString(session), info, factory); + this.handlerCompletionSink = completionSink; + this.flux = Flux.create(emitter -> { + this.sink = emitter; + emitter.onRequest(n -> { + boolean demand = false; + this.lock.lock(); + try { + this.requested = Math.addExact(this.requested, n); + if (this.requested < 0L) { + this.requested = Long.MAX_VALUE; + } - - @Override - protected boolean canSuspendReceiving() { - // Jetty 12 TODO: research suspend functionality in Jetty 12 - return false; - } - - @Override - protected void suspendReceiving() { - } - - @Override - protected void resumeReceiving() { - } - - @Override - protected boolean sendMessage(WebSocketMessage message) throws IOException { - DataBuffer dataBuffer = message.getPayload(); - Session session = getDelegate(); - if (WebSocketMessage.Type.TEXT.equals(message.getType())) { - getSendProcessor().setReadyToSend(false); - String text = dataBuffer.toString(StandardCharsets.UTF_8); - session.sendText(text, new SendProcessorCallback()); - } - else { - if (WebSocketMessage.Type.BINARY.equals(message.getType())) { - getSendProcessor().setReadyToSend(false); - } - try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { - while (iterator.hasNext()) { - ByteBuffer byteBuffer = iterator.next(); - switch (message.getType()) { - case BINARY -> session.sendBinary(byteBuffer, new SendProcessorCallback()); - case PING -> session.sendPing(byteBuffer, new SendProcessorCallback()); - case PONG -> session.sendPong(byteBuffer, new SendProcessorCallback()); - default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + if (!this.awaitingMessage && this.requested > 0) { + if (this.requested != Long.MAX_VALUE) { + this.requested--; + } + this.awaitingMessage = true; + demand = true; } } + finally { + this.lock.unlock(); + } + + if (demand) { + getDelegate().demand(); + } + }); + }); + } + + void handleMessage(WebSocketMessage message) { + Assert.state(this.sink != null, "No sink available"); + this.sink.next(message); + + boolean demand = false; + this.lock.lock(); + try { + if (!this.awaitingMessage) { + throw new IllegalStateException(); + } + this.awaitingMessage = false; + if (this.requested > 0) { + if (this.requested != Long.MAX_VALUE) { + this.requested--; + } + this.awaitingMessage = true; + demand = true; } } - return true; + finally { + this.lock.unlock(); + } + + if (demand) { + getDelegate().demand(); + } + } + + void handleError(Throwable ex) { + } + + void handleClose(CloseStatus closeStatus) { + this.closeStatusSink.tryEmitValue(closeStatus); + if (this.sink != null) { + this.sink.complete(); + } + } + + void onHandlerError(Throwable error) { + if (JettyWebSocketSession.this.handlerCompletionSink != null) { + JettyWebSocketSession.this.handlerCompletionSink.tryEmitError(error); + } + getDelegate().close(StatusCode.SERVER_ERROR, error.getMessage(), Callback.NOOP); + } + + void onHandleComplete() { + if (JettyWebSocketSession.this.handlerCompletionSink != null) { + JettyWebSocketSession.this.handlerCompletionSink.tryEmitEmpty(); + } + getDelegate().close(StatusCode.NORMAL, null, Callback.NOOP); } @Override @@ -108,25 +168,81 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession close(CloseStatus status) { Callback.Completable callback = new Callback.Completable(); getDelegate().close(status.getCode(), status.getReason(), callback); - return Mono.fromFuture(callback); } - - private final class SendProcessorCallback implements Callback { - - @Override - public void fail(Throwable x) { - getSendProcessor().cancel(); - getSendProcessor().onError(x); - } - - @Override - public void succeed() { - getSendProcessor().setReadyToSend(true); - getSendProcessor().onWritePossible(); - } - + @Override + public Mono closeStatus() { + return this.closeStatusSink.asMono(); } + @Override + public Flux receive() { + return this.flux; + } + + @Override + public Mono send(Publisher messages) { + return Flux.from(messages) + .flatMap(this::sendMessage, 1) + .then(); + } + + protected Mono sendMessage(WebSocketMessage message) { + + Callback.Completable completable = new Callback.Completable(); + DataBuffer dataBuffer = message.getPayload(); + Session session = getDelegate(); + if (WebSocketMessage.Type.TEXT.equals(message.getType())) { + String text = dataBuffer.toString(StandardCharsets.UTF_8); + session.sendText(text, completable); + } + else { + switch (message.getType()) { + case BINARY -> { + @SuppressWarnings("resource") + DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers(); + new IteratingCallback() { + @Override + protected Action process() { + if (!iterator.hasNext()) { + return Action.SUCCEEDED; + } + + ByteBuffer buffer = iterator.next(); + boolean last = iterator.hasNext(); + session.sendPartialBinary(buffer, last, Callback.from(this::succeeded, this::failed)); + return Action.SCHEDULED; + } + + @Override + protected void onCompleteSuccess() { + iterator.close(); + completable.succeed(); + } + + @Override + protected void onCompleteFailure(Throwable cause) { + iterator.close(); + completable.fail(cause); + } + }.iterate(); + } + case PING -> { + // Maximum size of Control frame payload is 125, per RFC 6455. + ByteBuffer buffer = BufferUtil.allocate(125); + dataBuffer.toByteBuffer(buffer); + session.sendPing(buffer, completable); + } + case PONG -> { + // Maximum size of Control frame payload is 125, per RFC 6455. + ByteBuffer buffer = BufferUtil.allocate(125); + dataBuffer.toByteBuffer(buffer); + session.sendPong(buffer, completable); + } + default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + return Mono.fromFuture(completable); + } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java new file mode 100644 index 0000000000..bc770de681 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java @@ -0,0 +1,111 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.client; + +import java.io.IOException; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.Response; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.JettyUpgradeListener; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import org.springframework.context.Lifecycle; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.lang.Nullable; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; + +public class JettyWebSocketClient implements WebSocketClient, Lifecycle { + + private final org.eclipse.jetty.websocket.client.WebSocketClient client; + + public JettyWebSocketClient() { + this(new org.eclipse.jetty.websocket.client.WebSocketClient()); + } + + public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient client) { + this.client = client; + } + + @Override + public void start() { + LifeCycle.start(this.client); + } + + @Override + public void stop() { + LifeCycle.stop(this.client); + } + + @Override + public boolean isRunning() { + return this.client.isRunning(); + } + + @Override + public Mono execute(URI url, WebSocketHandler handler) { + return execute(url, null, handler); + } + + @Override + public Mono execute(URI url, @Nullable HttpHeaders headers, WebSocketHandler handler) { + + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols(handler.getSubProtocols()); + if (headers != null) { + headers.keySet().forEach(header -> upgradeRequest.setHeader(header, headers.getValuesAsList(header))); + } + + final AtomicReference handshakeInfo = new AtomicReference<>(); + JettyUpgradeListener jettyUpgradeListener = new JettyUpgradeListener() { + @Override + public void onHandshakeResponse(Request request, Response response) { + String protocol = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL); + HttpHeaders responseHeaders = new HttpHeaders(); + response.getHeaders().forEach(header -> responseHeaders.add(header.getName(), header.getValue())); + handshakeInfo.set(new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol)); + } + }; + + Sinks.Empty completion = Sinks.empty(); + JettyWebSocketHandlerAdapter handlerAdapter = new JettyWebSocketHandlerAdapter(handler, session -> + new JettyWebSocketSession(session, Objects.requireNonNull(handshakeInfo.get()), DefaultDataBufferFactory.sharedInstance, completion)); + try { + this.client.connect(handlerAdapter, url, upgradeRequest, jettyUpgradeListener) + .exceptionally(throwable -> { + // Only fail the completion if we have an error + // as the JettyWebSocketSession will never be opened. + completion.tryEmitError(throwable); + return null; + }); + return completion.asMono(); + } + catch (IOException ex) { + return Mono.error(ex); + } + } +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java index 81b5326e81..603d8e6183 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java @@ -43,6 +43,7 @@ import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.WebSocketService; +import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; @@ -76,6 +77,8 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle { private static final boolean jettyWsPresent; + private static final boolean jettyCoreWsPresent; + private static final boolean undertowWsPresent; private static final boolean reactorNettyPresent; @@ -88,6 +91,8 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle { "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader); jettyWsPresent = ClassUtils.isPresent( "org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer", classLoader); + jettyCoreWsPresent = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.server.ServerWebSocketContainer", classLoader); undertowWsPresent = ClassUtils.isPresent( "io.undertow.websockets.WebSocketProtocolHandshakeHandler", classLoader); reactorNettyPresent = ClassUtils.isPresent( @@ -278,6 +283,9 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle { else if (jettyWsPresent) { return new JettyRequestUpgradeStrategy(); } + else if (jettyCoreWsPresent) { + return new JettyCoreRequestUpgradeStrategy(); + } else if (undertowWsPresent) { return new UndertowRequestUpgradeStrategy(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyCoreRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyCoreRequestUpgradeStrategy.java new file mode 100644 index 0000000000..509981c6dc --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyCoreRequestUpgradeStrategy.java @@ -0,0 +1,127 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.server.upgrade; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.api.Configurable; +import org.eclipse.jetty.websocket.api.exceptions.WebSocketException; +import org.eclipse.jetty.websocket.server.ServerWebSocketContainer; +import org.eclipse.jetty.websocket.server.WebSocketCreator; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponseDecorator; +import org.springframework.lang.Nullable; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +/** + * A WebSocket {@code RequestUpgradeStrategy} for Jetty 12 Core. + * + * @author Rossen Stoyanchev + * @since 5.3.4 + */ +public class JettyCoreRequestUpgradeStrategy implements RequestUpgradeStrategy { + + @Nullable + private Consumer webSocketConfigurer; + + @Nullable + private ServerWebSocketContainer serverContainer; + + /** + * Add a callback to configure WebSocket server parameters on + * {@link JettyWebSocketServerContainer}. + * @since 6.1 + */ + public void addWebSocketConfigurer(Consumer webSocketConfigurer) { + this.webSocketConfigurer = (this.webSocketConfigurer != null ? + this.webSocketConfigurer.andThen(webSocketConfigurer) : webSocketConfigurer); + } + + @Override + public Mono upgrade( + ServerWebExchange exchange, WebSocketHandler handler, + @Nullable String subProtocol, Supplier handshakeInfoFactory) { + + ServerHttpRequest request = exchange.getRequest(); + ServerHttpResponse response = exchange.getResponse(); + + Request jettyRequest = ServerHttpRequestDecorator.getNativeRequest(request); + Response jettyResponse = ServerHttpResponseDecorator.getNativeResponse(response); + + HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); + DataBufferFactory factory = response.bufferFactory(); + + // Trigger WebFlux preCommit actions before upgrade + return exchange.getResponse().setComplete() + .then(Mono.deferContextual(contextView -> { + JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter( + ContextWebSocketHandler.decorate(handler, contextView), + session -> new JettyWebSocketSession(session, handshakeInfo, factory)); + + WebSocketCreator webSocketCreator = (upgradeRequest, upgradeResponse, callback) -> { + if (subProtocol != null) { + upgradeResponse.setAcceptedSubProtocol(subProtocol); + } + return adapter; + }; + + Callback.Completable callback = new Callback.Completable(); + Mono mono = Mono.fromFuture(callback); + ServerWebSocketContainer container = getWebSocketServerContainer(jettyRequest); + try { + if (!container.upgrade(webSocketCreator, jettyRequest, jettyResponse, callback)) { + throw new WebSocketException("request could not be upgraded to websocket"); + } + } + catch (WebSocketException ex) { + callback.failed(ex); + } + + return mono; + })); + } + + private ServerWebSocketContainer getWebSocketServerContainer(Request jettyRequest) { + if (this.serverContainer == null) { + Server server = jettyRequest.getConnectionMetaData().getConnector().getServer(); + ServerWebSocketContainer container = ServerWebSocketContainer.get(server.getContext()); + if (this.webSocketConfigurer != null) { + this.webSocketConfigurer.accept(container); + } + this.serverContainer = container; + } + return this.serverContainer; + } + +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java index 66b41aa3ac..46d8b70903 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java @@ -42,7 +42,7 @@ import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; /** - * A WebSocket {@code RequestUpgradeStrategy} for Jetty 11. + * A WebSocket {@code RequestUpgradeStrategy} for Jetty 12 EE10. * * @author Rossen Stoyanchev * @since 5.3.4 diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java index 293cbdeec5..06c01fbfef 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java @@ -16,7 +16,12 @@ package org.springframework.web.reactive.result.method.annotation; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; @@ -28,10 +33,16 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Named.named; /** * Integration tests related to the use of context paths. @@ -40,15 +51,25 @@ import static org.assertj.core.api.Assertions.assertThat; */ class ContextPathIntegrationTests { - @Test - void multipleWebFluxApps() throws Exception { + static Stream> httpServers() { + return Stream.of( + named("Jetty", new JettyHttpServer()), + named("Jetty Core", new JettyCoreHttpServer()), + named("Reactor Netty", new ReactorHttpServer()), + named("Tomcat", new TomcatHttpServer()), + named("Undertow", new UndertowHttpServer()) + ); + } + + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("httpServers") + void multipleWebFluxApps(AbstractHttpServer server) throws Exception { AnnotationConfigApplicationContext context1 = new AnnotationConfigApplicationContext(WebAppConfig.class); AnnotationConfigApplicationContext context2 = new AnnotationConfigApplicationContext(WebAppConfig.class); HttpHandler webApp1Handler = WebHttpHandlerBuilder.applicationContext(context1).build(); HttpHandler webApp2Handler = WebHttpHandlerBuilder.applicationContext(context2).build(); - ReactorHttpServer server = new ReactorHttpServer(); server.registerHttpHandler("/webApp1", webApp1Handler); server.registerHttpHandler("/webApp2", webApp2Handler); server.afterPropertiesSet(); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 6ffd05022e..39b24fc58c 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -53,6 +53,7 @@ import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; @@ -127,7 +128,7 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @ParameterizedSseTest void sseAsEvent(HttpServer httpServer, ClientHttpConnector connector) throws Exception { - assumeTrue(httpServer instanceof JettyHttpServer); + assumeTrue(httpServer instanceof JettyHttpServer || httpServer instanceof JettyCoreHttpServer); startServer(httpServer, connector); @@ -302,18 +303,21 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { static Stream arguments() { return Stream.of( - args(new JettyHttpServer(), new ReactorClientHttpConnector()), - args(new JettyHttpServer(), new JettyClientHttpConnector()), - args(new JettyHttpServer(), new HttpComponentsClientHttpConnector()), - args(new ReactorHttpServer(), new ReactorClientHttpConnector()), - args(new ReactorHttpServer(), new JettyClientHttpConnector()), - args(new ReactorHttpServer(), new HttpComponentsClientHttpConnector()), - args(new TomcatHttpServer(), new ReactorClientHttpConnector()), - args(new TomcatHttpServer(), new JettyClientHttpConnector()), - args(new TomcatHttpServer(), new HttpComponentsClientHttpConnector()), - args(new UndertowHttpServer(), new ReactorClientHttpConnector()), - args(new UndertowHttpServer(), new JettyClientHttpConnector()), - args(new UndertowHttpServer(), new HttpComponentsClientHttpConnector()) + args(new JettyHttpServer(), new ReactorClientHttpConnector()), + args(new JettyHttpServer(), new JettyClientHttpConnector()), + args(new JettyHttpServer(), new HttpComponentsClientHttpConnector()), + args(new JettyCoreHttpServer(), new ReactorClientHttpConnector()), + args(new JettyCoreHttpServer(), new JettyClientHttpConnector()), + args(new JettyCoreHttpServer(), new HttpComponentsClientHttpConnector()), + args(new ReactorHttpServer(), new ReactorClientHttpConnector()), + args(new ReactorHttpServer(), new JettyClientHttpConnector()), + args(new ReactorHttpServer(), new HttpComponentsClientHttpConnector()), + args(new TomcatHttpServer(), new ReactorClientHttpConnector()), + args(new TomcatHttpServer(), new JettyClientHttpConnector()), + args(new TomcatHttpServer(), new HttpComponentsClientHttpConnector()), + args(new UndertowHttpServer(), new ReactorClientHttpConnector()), + args(new UndertowHttpServer(), new JettyClientHttpConnector()), + args(new UndertowHttpServer(), new HttpComponentsClientHttpConnector()) ); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java index d6954505fa..9392f0cca9 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java @@ -45,6 +45,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.web.filter.reactive.ServerWebExchangeContextFilter; import org.springframework.web.reactive.DispatcherHandler; +import org.springframework.web.reactive.socket.client.JettyWebSocketClient; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.TomcatWebSocketClient; import org.springframework.web.reactive.socket.client.UndertowWebSocketClient; @@ -53,6 +54,7 @@ import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.WebSocketService; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; @@ -61,6 +63,7 @@ import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpg import org.springframework.web.server.WebFilter; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; @@ -90,6 +93,7 @@ abstract class AbstractReactiveWebSocketIntegrationTests { WebSocketClient[] clients = new WebSocketClient[] { new TomcatWebSocketClient(), + new JettyWebSocketClient(), new ReactorNettyWebSocketClient(), new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY)) }; @@ -97,6 +101,7 @@ abstract class AbstractReactiveWebSocketIntegrationTests { Map> servers = new LinkedHashMap<>(); servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class); servers.put(new JettyHttpServer(), JettyConfig.class); + servers.put(new JettyCoreHttpServer(), JettyCoreConfig.class); servers.put(new ReactorHttpServer(), ReactorNettyConfig.class); servers.put(new UndertowHttpServer(), UndertowConfig.class); @@ -241,4 +246,12 @@ abstract class AbstractReactiveWebSocketIntegrationTests { } } + @Configuration + static class JettyCoreConfig extends AbstractHandlerAdapterConfig { + + @Override + protected RequestUpgradeStrategy getUpgradeStrategy() { + return new JettyCoreRequestUpgradeStrategy(); + } + } } diff --git a/spring-websocket/spring-websocket.gradle b/spring-websocket/spring-websocket.gradle index 72df03b20d..2250f5fdf3 100644 --- a/spring-websocket/spring-websocket.gradle +++ b/spring-websocket/spring-websocket.gradle @@ -19,6 +19,7 @@ dependencies { optional("org.eclipse.jetty.ee10:jetty-ee10-webapp") { exclude group: "jakarta.servlet", module: "jakarta.servlet-api" } + optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-api") optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jakarta-server") optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") { exclude group: "jakarta.servlet", module: "jakarta.servlet-api" diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java index b996b920e1..f57dff5008 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java @@ -20,17 +20,11 @@ import java.nio.ByteBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.Callback; -import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.eclipse.jetty.websocket.core.OpCode; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.CloseStatus; @@ -40,23 +34,22 @@ import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator; /** - * Adapts {@link WebSocketHandler} to the Jetty WebSocket API. + * Adapts {@link WebSocketHandler} to the Jetty WebSocket API {@link org.eclipse.jetty.websocket.api.Session.Listener}. * * @author Rossen Stoyanchev * @since 4.0 */ -@WebSocket -public class JettyWebSocketHandlerAdapter { - - private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); +public class JettyWebSocketHandlerAdapter implements Session.Listener { private static final Log logger = LogFactory.getLog(JettyWebSocketHandlerAdapter.class); - private final WebSocketHandler webSocketHandler; private final JettyWebSocketSession wsSession; + @Nullable + private Session nativeSession; + public JettyWebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebSocketSession wsSession) { Assert.notNull(webSocketHandler, "WebSocketHandler must not be null"); @@ -65,69 +58,60 @@ public class JettyWebSocketHandlerAdapter { this.wsSession = wsSession; } - - @OnWebSocketOpen + @Override public void onWebSocketOpen(Session session) { try { + this.nativeSession = session; this.wsSession.initializeNativeSession(session); this.webSocketHandler.afterConnectionEstablished(this.wsSession); + this.nativeSession.demand(); } catch (Exception ex) { - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + tryCloseWithError(ex); } } - @OnWebSocketMessage + @Override public void onWebSocketText(String payload) { + Assert.state(this.nativeSession != null, "No native session available"); TextMessage message = new TextMessage(payload); try { this.webSocketHandler.handleMessage(this.wsSession, message); + this.nativeSession.demand(); } catch (Exception ex) { - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + tryCloseWithError(ex); } } - @OnWebSocketMessage + @Override public void onWebSocketBinary(ByteBuffer payload, Callback callback) { - BinaryMessage message = new BinaryMessage(copyByteBuffer(payload), true); + Assert.state(this.nativeSession != null, "No native session available"); + BinaryMessage message = new BinaryMessage(BufferUtil.copy(payload), true); + callback.succeed(); try { this.webSocketHandler.handleMessage(this.wsSession, message); - callback.succeed(); + this.nativeSession.demand(); } catch (Exception ex) { - callback.fail(ex); - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + tryCloseWithError(ex); } } - @OnWebSocketFrame - public void onWebSocketFrame(Frame frame, Callback callback) { - if (OpCode.PONG == frame.getOpCode()) { - ByteBuffer payload = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; - PongMessage message = new PongMessage(copyByteBuffer(payload)); - try { - this.webSocketHandler.handleMessage(this.wsSession, message); - callback.succeed(); - } - catch (Exception ex) { - callback.fail(ex); - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); - } + @Override + public void onWebSocketPong(ByteBuffer payload) { + Assert.state(this.nativeSession != null, "No native session available"); + PongMessage message = new PongMessage(BufferUtil.copy(payload)); + try { + this.webSocketHandler.handleMessage(this.wsSession, message); + this.nativeSession.demand(); } - else { - callback.succeed(); + catch (Exception ex) { + tryCloseWithError(ex); } } - private static ByteBuffer copyByteBuffer(ByteBuffer src) { - ByteBuffer dest = ByteBuffer.allocate(src.remaining()); - dest.put(src); - dest.flip(); - return dest; - } - - @OnWebSocketClose + @Override public void onWebSocketClose(int statusCode, String reason) { CloseStatus closeStatus = new CloseStatus(statusCode, reason); try { @@ -135,19 +119,32 @@ public class JettyWebSocketHandlerAdapter { } catch (Exception ex) { if (logger.isWarnEnabled()) { - logger.warn("Unhandled exception after connection closed for " + this, ex); + logger.warn("Unhandled exception from afterConnectionClosed for " + this, ex); } } } - @OnWebSocketError + @Override public void onWebSocketError(Throwable cause) { try { this.webSocketHandler.handleTransportError(this.wsSession, cause); } catch (Exception ex) { - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + if (logger.isWarnEnabled()) { + logger.warn("Unhandled exception from handleTransportError for " + this, ex); + } } } + private void tryCloseWithError(Throwable t) { + if (this.nativeSession != null) { + if (this.nativeSession.isOpen()) { + ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); + } + else { + // Session might be O-SHUT waiting for response close frame, so abort to close the connection. + this.nativeSession.disconnect(); + } + } + } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java index 026f9af4fd..2ed4542e31 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java @@ -45,7 +45,7 @@ import org.springframework.web.socket.server.HandshakeFailureException; import org.springframework.web.socket.server.RequestUpgradeStrategy; /** - * A {@link RequestUpgradeStrategy} for Jetty 11. + * A {@link RequestUpgradeStrategy} for Jetty 12 EE10. * * @author Rossen Stoyanchev * @since 5.3.4