Implement Eclipse Jetty core HTTP handler adapter

This provides an implementation of an HTTP Handler Adapter that is coded
directly to the Eclipse Jetty core API, bypassing any servlet
implementation.

This includes a Jetty implementation of the spring `WebSocketClient`
interface, `JettyWebSocketClient`, using an explicit dependency to the
jetty-websocket-api.

Closes gh-32097

Co-authored-by: Lachlan Roberts <lachlan@webtide.com>
Co-authored-by: Arjen Poutsma <arjen.poutsma@broadcom.com>
This commit is contained in:
gregw 2024-01-15 19:05:09 +11:00 committed by Simon Baslé
parent b7ec028149
commit 0a60c622cc
33 changed files with 1701 additions and 526 deletions

View File

@ -16,8 +16,8 @@ dependencies {
api(platform("org.apache.groovy:groovy-bom:4.0.21")) api(platform("org.apache.groovy:groovy-bom:4.0.21"))
api(platform("org.apache.logging.log4j:log4j-bom:2.21.1")) api(platform("org.apache.logging.log4j:log4j-bom:2.21.1"))
api(platform("org.assertj:assertj-bom:3.26.0")) api(platform("org.assertj:assertj-bom:3.26.0"))
api(platform("org.eclipse.jetty:jetty-bom:12.0.10")) api(platform("org.eclipse.jetty:jetty-bom:12.0.11"))
api(platform("org.eclipse.jetty.ee10:jetty-ee10-bom:12.0.10")) api(platform("org.eclipse.jetty.ee10:jetty-ee10-bom:12.0.11"))
api(platform("org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.7.3")) api(platform("org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.7.3"))
api(platform("org.jetbrains.kotlinx:kotlinx-serialization-bom:1.6.0")) api(platform("org.jetbrains.kotlinx:kotlinx-serialization-bom:1.6.0"))
api(platform("org.junit:junit-bom:5.10.3")) api(platform("org.junit:junit-bom:5.10.3"))

View File

@ -81,6 +81,7 @@ dependencies {
optional("io.smallrye.reactive:mutiny") optional("io.smallrye.reactive:mutiny")
optional("net.sf.jopt-simple:jopt-simple") optional("net.sf.jopt-simple:jopt-simple")
optional("org.aspectj:aspectjweaver") optional("org.aspectj:aspectjweaver")
optional("org.eclipse.jetty:jetty-io")
optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-reflect")
optional("org.jetbrains.kotlin:kotlin-stdlib") optional("org.jetbrains.kotlin:kotlin-stdlib")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core") optional("org.jetbrains.kotlinx:kotlinx-coroutines-core")

View File

@ -355,7 +355,7 @@ public class DefaultDataBuffer implements DataBuffer {
} }
@Override @Override
public DataBuffer split(int index) { public DefaultDataBuffer split(int index) {
checkIndex(index); checkIndex(index);
ByteBuffer split = this.byteBuffer.duplicate().clear() ByteBuffer split = this.byteBuffer.duplicate().clear()

View File

@ -0,0 +1,359 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;
import org.eclipse.jetty.io.Content;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Implementation of the {@code DataBuffer} interface that can wrap a Jetty
* {@link Content.Chunk}. Typically constructed with {@link JettyDataBufferFactory}.
*
* @author Greg Wilkins
* @author Lachlan Roberts
* @author Arjen Poutsma
* @since 6.2
*/
public final class JettyDataBuffer implements PooledDataBuffer {
private final DefaultDataBuffer delegate;
@Nullable
private final Content.Chunk chunk;
private final JettyDataBufferFactory bufferFactory;
private final AtomicInteger refCount = new AtomicInteger(1);
JettyDataBuffer(JettyDataBufferFactory bufferFactory, DefaultDataBuffer delegate, Content.Chunk chunk) {
Assert.notNull(bufferFactory, "BufferFactory must not be null");
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(chunk, "Chunk must not be null");
this.bufferFactory = bufferFactory;
this.delegate = delegate;
this.chunk = chunk;
this.chunk.retain();
}
JettyDataBuffer(JettyDataBufferFactory bufferFactory, DefaultDataBuffer delegate) {
Assert.notNull(bufferFactory, "BufferFactory must not be null");
Assert.notNull(delegate, "Delegate must not be null");
this.bufferFactory = bufferFactory;
this.delegate = delegate;
this.chunk = null;
}
@Override
public boolean isAllocated() {
return this.refCount.get() > 0;
}
@Override
public PooledDataBuffer retain() {
int result = this.refCount.updateAndGet(c -> {
if (c != 0) {
return c + 1;
}
else {
return 0;
}
});
if (result != 0 && this.chunk != null) {
this.chunk.retain();
}
return this;
}
@Override
public PooledDataBuffer touch(Object hint) {
return this;
}
@Override
public boolean release() {
int result = this.refCount.updateAndGet(c -> {
if (c != 0) {
return c - 1;
}
else {
throw new IllegalStateException("JettyDataBuffer already released: " + this);
}
});
if (this.chunk != null) {
return this.chunk.release();
}
else {
return result == 0;
}
}
@Override
public DataBufferFactory factory() {
return this.bufferFactory;
}
// delegation
@Override
public int indexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.indexOf(predicate, fromIndex);
}
@Override
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.lastIndexOf(predicate, fromIndex);
}
@Override
public int readableByteCount() {
return this.delegate.readableByteCount();
}
@Override
public int writableByteCount() {
return this.delegate.writableByteCount();
}
@Override
public int capacity() {
return this.delegate.capacity();
}
@Override
@Deprecated
public DataBuffer capacity(int capacity) {
this.delegate.capacity(capacity);
return this;
}
@Override
public DataBuffer ensureWritable(int capacity) {
this.delegate.ensureWritable(capacity);
return this;
}
@Override
public int readPosition() {
return this.delegate.readPosition();
}
@Override
public DataBuffer readPosition(int readPosition) {
this.delegate.readPosition(readPosition);
return this;
}
@Override
public int writePosition() {
return this.delegate.writePosition();
}
@Override
public DataBuffer writePosition(int writePosition) {
this.delegate.writePosition(writePosition);
return this;
}
@Override
public byte getByte(int index) {
return this.delegate.getByte(index);
}
@Override
public byte read() {
return this.delegate.read();
}
@Override
public DataBuffer read(byte[] destination) {
this.delegate.read(destination);
return this;
}
@Override
public DataBuffer read(byte[] destination, int offset, int length) {
this.delegate.read(destination, offset, length);
return this;
}
@Override
public DataBuffer write(byte b) {
this.delegate.write(b);
return this;
}
@Override
public DataBuffer write(byte[] source) {
this.delegate.write(source);
return this;
}
@Override
public DataBuffer write(byte[] source, int offset, int length) {
this.delegate.write(source, offset, length);
return this;
}
@Override
public DataBuffer write(DataBuffer... buffers) {
this.delegate.write(buffers);
return this;
}
@Override
public DataBuffer write(ByteBuffer... buffers) {
this.delegate.write(buffers);
return this;
}
@Override
@Deprecated
public DataBuffer slice(int index, int length) {
DefaultDataBuffer delegateSlice = this.delegate.slice(index, length);
if (this.chunk != null) {
this.chunk.retain();
return new JettyDataBuffer(this.bufferFactory, delegateSlice, this.chunk);
}
else {
return new JettyDataBuffer(this.bufferFactory, delegateSlice);
}
}
@Override
public DataBuffer split(int index) {
DefaultDataBuffer delegateSplit = this.delegate.split(index);
if (this.chunk != null) {
this.chunk.retain();
return new JettyDataBuffer(this.bufferFactory, delegateSplit, this.chunk);
}
else {
return new JettyDataBuffer(this.bufferFactory, delegateSplit);
}
}
@Override
@Deprecated
public ByteBuffer asByteBuffer() {
return this.delegate.asByteBuffer();
}
@Override
@Deprecated
public ByteBuffer asByteBuffer(int index, int length) {
return this.delegate.asByteBuffer(index, length);
}
@Override
@Deprecated
public ByteBuffer toByteBuffer(int index, int length) {
return this.delegate.toByteBuffer(index, length);
}
@Override
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) {
this.delegate.toByteBuffer(srcPos, dest, destPos, length);
}
@Override
public ByteBufferIterator readableByteBuffers() {
ByteBufferIterator delegateIterator = this.delegate.readableByteBuffers();
if (this.chunk != null) {
return new JettyByteBufferIterator(delegateIterator, this.chunk);
}
else {
return delegateIterator;
}
}
@Override
public ByteBufferIterator writableByteBuffers() {
ByteBufferIterator delegateIterator = this.delegate.writableByteBuffers();
if (this.chunk != null) {
return new JettyByteBufferIterator(delegateIterator, this.chunk);
}
else {
return delegateIterator;
}
}
@Override
public String toString(int index, int length, Charset charset) {
return this.delegate.toString(index, length, charset);
}
@Override
public int hashCode() {
return this.delegate.hashCode();
}
@Override
public boolean equals(Object o) {
return this == o || (o instanceof JettyDataBuffer other &&
this.delegate.equals(other.delegate));
}
@Override
public String toString() {
return String.format("JettyDataBuffer (r: %d, w: %d, c: %d)",
readPosition(), writePosition(), capacity());
}
private static final class JettyByteBufferIterator implements ByteBufferIterator {
private final ByteBufferIterator delegate;
private final Content.Chunk chunk;
public JettyByteBufferIterator(ByteBufferIterator delegate, Content.Chunk chunk) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(chunk, "Chunk must not be null");
this.delegate = delegate;
this.chunk = chunk;
this.chunk.retain();
}
@Override
public void close() {
this.delegate.close();
this.chunk.release();
}
@Override
public boolean hasNext() {
return this.delegate.hasNext();
}
@Override
public ByteBuffer next() {
return this.delegate.next();
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.List;
import org.eclipse.jetty.io.Content;
/**
* Implementation of the {@code DataBufferFactory} interface that creates
* {@link JettyDataBuffer} instances.
*
* @author Arjen Poutsma
* @since 6.2
*/
public class JettyDataBufferFactory implements DataBufferFactory {
private final DefaultDataBufferFactory delegate;
/**
* Creates a new {@code JettyDataBufferFactory} with default settings.
*/
public JettyDataBufferFactory() {
this(false);
}
/**
* Creates a new {@code JettyDataBufferFactory}, indicating whether direct
* buffers should be created by {@link #allocateBuffer()} and
* {@link #allocateBuffer(int)}.
* @param preferDirect {@code true} if direct buffers are to be preferred;
* {@code false} otherwise
*/
public JettyDataBufferFactory(boolean preferDirect) {
this(preferDirect, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY);
}
/**
* Creates a new {@code JettyDataBufferFactory}, indicating whether direct
* buffers should be created by {@link #allocateBuffer()} and
* {@link #allocateBuffer(int)}, and what the capacity is to be used for
* {@link #allocateBuffer()}.
* @param preferDirect {@code true} if direct buffers are to be preferred;
* {@code false} otherwise
*/
public JettyDataBufferFactory(boolean preferDirect, int defaultInitialCapacity) {
this.delegate = new DefaultDataBufferFactory(preferDirect, defaultInitialCapacity);
}
@Override
@Deprecated
public JettyDataBuffer allocateBuffer() {
DefaultDataBuffer delegate = this.delegate.allocateBuffer();
return new JettyDataBuffer(this, delegate);
}
@Override
public JettyDataBuffer allocateBuffer(int initialCapacity) {
DefaultDataBuffer delegate = this.delegate.allocateBuffer(initialCapacity);
return new JettyDataBuffer(this, delegate);
}
@Override
public JettyDataBuffer wrap(ByteBuffer byteBuffer) {
DefaultDataBuffer delegate = this.delegate.wrap(byteBuffer);
return new JettyDataBuffer(this, delegate);
}
@Override
public JettyDataBuffer wrap(byte[] bytes) {
DefaultDataBuffer delegate = this.delegate.wrap(bytes);
return new JettyDataBuffer(this, delegate);
}
public JettyDataBuffer wrap(Content.Chunk chunk) {
ByteBuffer byteBuffer = chunk.getByteBuffer();
DefaultDataBuffer delegate = this.delegate.wrap(byteBuffer);
return new JettyDataBuffer(this, delegate, chunk);
}
@Override
public JettyDataBuffer join(List<? extends DataBuffer> dataBuffers) {
DefaultDataBuffer delegate = this.delegate.join(dataBuffers);
return new JettyDataBuffer(this, delegate);
}
@Override
public boolean isDirect() {
return this.delegate.isDirect();
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.Content;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
/**
* @author Arjen Poutsma
*/
public class JettyDataBufferTests {
private final JettyDataBufferFactory dataBufferFactory = new JettyDataBufferFactory();
@Test
void releaseRetainChunk() {
ByteBuffer buffer = ByteBuffer.allocate(3);
Content.Chunk mockChunk = mock();
given(mockChunk.getByteBuffer()).willReturn(buffer);
given(mockChunk.release()).willReturn(false, false, true);
JettyDataBuffer dataBuffer = this.dataBufferFactory.wrap(mockChunk);
dataBuffer.retain();
dataBuffer.retain();
assertThat(dataBuffer.release()).isFalse();
assertThat(dataBuffer.release()).isFalse();
assertThat(dataBuffer.release()).isTrue();
assertThatIllegalStateException().isThrownBy(dataBuffer::release);
then(mockChunk).should(times(3)).retain();
then(mockChunk).should(times(3)).release();
}
}

View File

@ -69,6 +69,15 @@ class PooledDataBufferTests {
} }
} }
@Nested
class Jetty implements PooledDataBufferTestingTrait {
@Override
public DataBufferFactory createDataBufferFactory() {
return new JettyDataBufferFactory();
}
}
interface PooledDataBufferTestingTrait { interface PooledDataBufferTestingTrait {
@ -82,10 +91,14 @@ class PooledDataBufferTests {
default void retainAndRelease() { default void retainAndRelease() {
PooledDataBuffer buffer = createDataBuffer(1); PooledDataBuffer buffer = createDataBuffer(1);
buffer.write((byte) 'a'); buffer.write((byte) 'a');
assertThat(buffer.isAllocated()).isTrue();
buffer.retain(); buffer.retain();
assertThat(buffer.isAllocated()).isTrue();
assertThat(buffer.release()).isFalse(); assertThat(buffer.release()).isFalse();
assertThat(buffer.isAllocated()).isTrue();
assertThat(buffer.release()).isTrue(); assertThat(buffer.release()).isTrue();
assertThat(buffer.isAllocated()).isFalse();
} }
@Test @Test

View File

@ -72,6 +72,7 @@ dependencies {
because("needed by Netty's SelfSignedCertificate on JDK 15+") because("needed by Netty's SelfSignedCertificate on JDK 15+")
} }
testFixturesImplementation("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") testFixturesImplementation("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server")
testFixturesImplementation("org.eclipse.jetty.websocket:jetty-websocket-jetty-server")
testImplementation(project(":spring-core-test")) testImplementation(project(":spring-core-test"))
testImplementation(testFixtures(project(":spring-beans"))) testImplementation(testFixtures(project(":spring-beans")))
testImplementation(testFixtures(project(":spring-context"))) testImplementation(testFixtures(project(":spring-context")))

View File

@ -17,24 +17,16 @@
package org.springframework.http.client.reactive; package org.springframework.http.client.reactive;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.IntPredicate;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.io.Content;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.JettyDataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.io.buffer.TouchableDataBuffer;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -50,7 +42,7 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
private final HttpClient httpClient; private final HttpClient httpClient;
private DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance; private JettyDataBufferFactory bufferFactory = new JettyDataBufferFactory();
/** /**
@ -103,7 +95,7 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
/** /**
* Set the buffer factory to use. * Set the buffer factory to use.
*/ */
public void setBufferFactory(DataBufferFactory bufferFactory) { public void setBufferFactory(JettyDataBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
} }
@ -134,289 +126,9 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
private Mono<ClientHttpResponse> execute(JettyClientHttpRequest request) { private Mono<ClientHttpResponse> execute(JettyClientHttpRequest request) {
return Mono.fromDirect(request.toReactiveRequest() return Mono.fromDirect(request.toReactiveRequest()
.response((reactiveResponse, chunkPublisher) -> { .response((reactiveResponse, chunkPublisher) -> {
Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer); Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this.bufferFactory::wrap);
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
})); }));
} }
private DataBuffer toDataBuffer(Content.Chunk chunk) {
DataBuffer delegate = this.bufferFactory.wrap(chunk.getByteBuffer());
return new JettyDataBuffer(delegate, chunk);
}
private static final class JettyDataBuffer implements PooledDataBuffer {
private final DataBuffer delegate;
private final Content.Chunk chunk;
private final AtomicInteger refCount = new AtomicInteger(1);
public JettyDataBuffer(DataBuffer delegate, Content.Chunk chunk) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(chunk, "Chunk must not be null");
this.delegate = delegate;
this.chunk = chunk;
}
@Override
public boolean isAllocated() {
return this.refCount.get() > 0;
}
@Override
public PooledDataBuffer retain() {
if (this.delegate instanceof PooledDataBuffer pooledDelegate) {
pooledDelegate.retain();
}
this.chunk.retain();
this.refCount.getAndUpdate(c -> {
if (c != 0) {
return c + 1;
}
else {
return 0;
}
});
return this;
}
@Override
public boolean release() {
if (this.delegate instanceof PooledDataBuffer pooledDelegate) {
pooledDelegate.release();
}
this.chunk.release();
int refCount = this.refCount.updateAndGet(c -> {
if (c != 0) {
return c - 1;
}
else {
throw new IllegalStateException("already released " + this);
}
});
return refCount == 0;
}
@Override
public PooledDataBuffer touch(Object hint) {
if (this.delegate instanceof TouchableDataBuffer touchableDelegate) {
touchableDelegate.touch(hint);
}
return this;
}
// delegation
@Override
public DataBufferFactory factory() {
return this.delegate.factory();
}
@Override
public int indexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.indexOf(predicate, fromIndex);
}
@Override
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.lastIndexOf(predicate, fromIndex);
}
@Override
public int readableByteCount() {
return this.delegate.readableByteCount();
}
@Override
public int writableByteCount() {
return this.delegate.writableByteCount();
}
@Override
public int capacity() {
return this.delegate.capacity();
}
@Override
@Deprecated
public DataBuffer capacity(int capacity) {
this.delegate.capacity(capacity);
return this;
}
@Override
public DataBuffer ensureWritable(int capacity) {
this.delegate.ensureWritable(capacity);
return this;
}
@Override
public int readPosition() {
return this.delegate.readPosition();
}
@Override
public DataBuffer readPosition(int readPosition) {
this.delegate.readPosition(readPosition);
return this;
}
@Override
public int writePosition() {
return this.delegate.writePosition();
}
@Override
public DataBuffer writePosition(int writePosition) {
this.delegate.writePosition(writePosition);
return this;
}
@Override
public byte getByte(int index) {
return this.delegate.getByte(index);
}
@Override
public byte read() {
return this.delegate.read();
}
@Override
public DataBuffer read(byte[] destination) {
this.delegate.read(destination);
return this;
}
@Override
public DataBuffer read(byte[] destination, int offset, int length) {
this.delegate.read(destination, offset, length);
return this;
}
@Override
public DataBuffer write(byte b) {
this.delegate.write(b);
return this;
}
@Override
public DataBuffer write(byte[] source) {
this.delegate.write(source);
return this;
}
@Override
public DataBuffer write(byte[] source, int offset, int length) {
this.delegate.write(source, offset, length);
return this;
}
@Override
public DataBuffer write(DataBuffer... buffers) {
this.delegate.write(buffers);
return this;
}
@Override
public DataBuffer write(ByteBuffer... buffers) {
this.delegate.write(buffers);
return this;
}
@Override
@Deprecated
public DataBuffer slice(int index, int length) {
DataBuffer delegateSlice = this.delegate.slice(index, length);
this.chunk.retain();
return new JettyDataBuffer(delegateSlice, this.chunk);
}
@Override
public DataBuffer split(int index) {
DataBuffer delegateSplit = this.delegate.split(index);
this.chunk.retain();
return new JettyDataBuffer(delegateSplit, this.chunk);
}
@Override
@Deprecated
public ByteBuffer asByteBuffer() {
return this.delegate.asByteBuffer();
}
@Override
@Deprecated
public ByteBuffer asByteBuffer(int index, int length) {
return this.delegate.asByteBuffer(index, length);
}
@Override
@Deprecated
public ByteBuffer toByteBuffer(int index, int length) {
return this.delegate.toByteBuffer(index, length);
}
@Override
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) {
this.delegate.toByteBuffer(srcPos, dest, destPos, length);
}
@Override
public ByteBufferIterator readableByteBuffers() {
ByteBufferIterator delegateIterator = this.delegate.readableByteBuffers();
return new JettyByteBufferIterator(delegateIterator, this.chunk);
}
@Override
public ByteBufferIterator writableByteBuffers() {
ByteBufferIterator delegateIterator = this.delegate.writableByteBuffers();
return new JettyByteBufferIterator(delegateIterator, this.chunk);
}
@Override
public String toString(int index, int length, Charset charset) {
return this.delegate.toString(index, length, charset);
}
private static final class JettyByteBufferIterator implements ByteBufferIterator {
private final ByteBufferIterator delegate;
private final Content.Chunk chunk;
public JettyByteBufferIterator(ByteBufferIterator delegate, Content.Chunk chunk) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(chunk, "Chunk must not be null");
this.delegate = delegate;
this.chunk = chunk;
this.chunk.retain();
}
@Override
public void close() {
this.delegate.close();
this.chunk.release();
}
@Override
public boolean hasNext() {
return this.delegate.hasNext();
}
@Override
public ByteBuffer next() {
return this.delegate.next();
}
}
}
} }

View File

@ -100,7 +100,6 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
return Flux.create(sink -> { return Flux.create(sink -> {
MultipartParser parser = new MultipartParser(sink, boundary, maxHeadersSize, headersCharset); MultipartParser parser = new MultipartParser(sink, boundary, maxHeadersSize, headersCharset);
sink.onCancel(parser::onSinkCancel); sink.onCancel(parser::onSinkCancel);
sink.onRequest(n -> parser.requestBuffer());
buffers.subscribe(parser); buffers.subscribe(parser);
}); });
} }
@ -112,8 +111,10 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
@Override @Override
protected void hookOnSubscribe(Subscription subscription) { protected void hookOnSubscribe(Subscription subscription) {
if (this.sink.requestedFromDownstream() > 0) {
requestBuffer(); requestBuffer();
} }
}
@Override @Override
protected void hookOnNext(DataBuffer value) { protected void hookOnNext(DataBuffer value) {

View File

@ -30,6 +30,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -70,7 +71,8 @@ class DefaultServerHttpRequestBuilder implements ServerHttpRequest.Builder {
Assert.notNull(original, "ServerHttpRequest is required"); Assert.notNull(original, "ServerHttpRequest is required");
this.uri = original.getURI(); this.uri = original.getURI();
this.headers = new HttpHeaders(original.getHeaders()); // original headers can be immutable, so create a copy
this.headers = new HttpHeaders(new LinkedMultiValueMap<>(original.getHeaders()));
this.httpMethod = original.getMethod(); this.httpMethod = original.getMethod();
this.contextPath = original.getPath().contextPath().value(); this.contextPath = original.getPath().contextPath().value();
this.remoteAddress = original.getRemoteAddress(); this.remoteAddress = original.getRemoteAddress();

View File

@ -0,0 +1,60 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.springframework.core.io.buffer.JettyDataBufferFactory;
import org.springframework.util.Assert;
/**
* Adapt {@link HttpHandler} to the Jetty {@link org.eclipse.jetty.server.Handler} abstraction.
*
* @author Greg Wilkins
* @author Lachlan Roberts
* @author Arjen Poutsma
* @since 6.2
*/
public class JettyCoreHttpHandlerAdapter extends Handler.Abstract.NonBlocking {
private final HttpHandler httpHandler;
private JettyDataBufferFactory dataBufferFactory = new JettyDataBufferFactory();
public JettyCoreHttpHandlerAdapter(HttpHandler httpHandler) {
this.httpHandler = httpHandler;
}
public void setDataBufferFactory(JettyDataBufferFactory dataBufferFactory) {
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
this.dataBufferFactory = dataBufferFactory;
}
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception {
this.httpHandler.handle(new JettyCoreServerHttpRequest(request, this.dataBufferFactory),
new JettyCoreServerHttpResponse(response, this.dataBufferFactory))
.subscribe(unused -> {}, callback::failed, callback::succeeded);
return true;
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.reactivestreams.FlowAdapters;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.JettyDataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.support.JettyHeadersAdapter;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* Adapt an Eclipse Jetty {@link Request} to a {@link org.springframework.http.server.ServerHttpRequest}.
*
* @author Greg Wilkins
* @author Arjen Poutsma
* @since 6.2
*/
class JettyCoreServerHttpRequest extends AbstractServerHttpRequest {
private final JettyDataBufferFactory dataBufferFactory;
private final Request request;
public JettyCoreServerHttpRequest(Request request, JettyDataBufferFactory dataBufferFactory) {
super(HttpMethod.valueOf(request.getMethod()),
request.getHttpURI().toURI(),
request.getContext().getContextPath(),
new HttpHeaders(new JettyHeadersAdapter(request.getHeaders())));
this.dataBufferFactory = dataBufferFactory;
this.request = request;
}
@Override
protected MultiValueMap<String, HttpCookie> initCookies() {
List<org.eclipse.jetty.http.HttpCookie> httpCookies = Request.getCookies(this.request);
if (httpCookies.isEmpty()) {
return CollectionUtils.toMultiValueMap(Collections.emptyMap());
}
MultiValueMap<String, HttpCookie> cookies =new LinkedMultiValueMap<>();
for (org.eclipse.jetty.http.HttpCookie c : httpCookies) {
cookies.add(c.getName(), new HttpCookie(c.getName(), c.getValue()));
}
return cookies;
}
@Override
@Nullable
public SslInfo initSslInfo() {
if (this.request.getConnectionMetaData().isSecure() &&
this.request.getAttribute(EndPoint.SslSessionData.ATTRIBUTE) instanceof EndPoint.SslSessionData sessionData) {
return new DefaultSslInfo(sessionData.sslSessionId(), sessionData.peerCertificates());
}
return null;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeRequest() {
return (T) this.request;
}
@Override
protected String initId() {
return this.request.getId();
}
@Override
@Nullable
public InetSocketAddress getLocalAddress() {
SocketAddress localAddress = this.request.getConnectionMetaData().getLocalSocketAddress();
return localAddress instanceof InetSocketAddress inet ? inet : null;
}
@Override
@Nullable
public InetSocketAddress getRemoteAddress() {
SocketAddress remoteAddress = this.request.getConnectionMetaData().getRemoteSocketAddress();
return remoteAddress instanceof InetSocketAddress inet ? inet : null;
}
@Override
public Flux<DataBuffer> getBody() {
// We access the request body as a Flow.Publisher, which is wrapped as an org.reactivestreams.Publisher and
// then wrapped as a Flux.
return Flux.from(FlowAdapters.toPublisher(Content.Source.asPublisher(this.request)))
.map(this.dataBufferFactory::wrap);
}
}

View File

@ -0,0 +1,237 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.HttpCookieUtils;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.JettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.support.JettyHeadersAdapter;
import org.springframework.lang.Nullable;
/**
* Adapt an Eclipse Jetty {@link Response} to a {@link org.springframework.http.server.ServerHttpResponse}.
*
* @author Greg Wilkins
* @author Lachlan Roberts
* @since 6.2
*/
class JettyCoreServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage {
private final Response response;
public JettyCoreServerHttpResponse(Response response, JettyDataBufferFactory dataBufferFactory) {
super(dataBufferFactory, new HttpHeaders(new JettyHeadersAdapter(response.getHeaders())));
this.response = response;
// remove all existing cookies from the response and add them to the cookie map, to be added back later
for (ListIterator<HttpField> i = this.response.getHeaders().listIterator(); i.hasNext(); ) {
HttpField f = i.next();
if (f instanceof HttpCookieUtils.SetCookieHttpField setCookieHttpField) {
HttpCookie httpCookie = setCookieHttpField.getHttpCookie();
ResponseCookie responseCookie = ResponseCookie.from(httpCookie.getName(), httpCookie.getValue())
.httpOnly(httpCookie.isHttpOnly())
.domain(httpCookie.getDomain())
.maxAge(httpCookie.getMaxAge())
.sameSite(httpCookie.getSameSite().name())
.secure(httpCookie.isSecure())
.partitioned(httpCookie.isPartitioned())
.build();
this.addCookie(responseCookie);
i.remove();
}
}
}
@Override
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
return Flux.from(body)
.concatMap(this::sendDataBuffer)
.then();
}
@Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return Flux.from(body).concatMap(this::writeWithInternal).then();
}
@Override
protected void applyStatusCode() {
HttpStatusCode status = getStatusCode();
this.response.setStatus(status == null ? 0 : status.value());
}
@Override
protected void applyHeaders() {
}
@Override
protected void applyCookies() {
this.getCookies().values().stream()
.flatMap(List::stream)
.forEach(cookie -> Response.addCookie(this.response, new ResponseHttpCookie(cookie)));
}
@Override
public Mono<Void> writeWith(Path file, long position, long count) {
Callback.Completable callback = new Callback.Completable();
Mono<Void> mono = Mono.fromFuture(callback);
try {
Content.copy(Content.Source.from(null, file, position, count), this.response, callback);
}
catch (Throwable th) {
callback.failed(th);
}
return doCommit(() -> mono);
}
private Mono<Void> sendDataBuffer(DataBuffer dataBuffer) {
return Mono.defer(() -> {
DataBuffer.ByteBufferIterator byteBufferIterator = dataBuffer.readableByteBuffers();
Callback.Completable callback = new Callback.Completable();
new IteratingCallback() {
@Override
protected Action process() {
if (!byteBufferIterator.hasNext()) {
return Action.SUCCEEDED;
}
response.write(false, byteBufferIterator.next(), this);
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess() {
byteBufferIterator.close();
DataBufferUtils.release(dataBuffer);
callback.complete(null);
}
@Override
protected void onCompleteFailure(Throwable cause) {
byteBufferIterator.close();
DataBufferUtils.release(dataBuffer);
callback.failed(cause);
}
}.iterate();
return Mono.fromFuture(callback);
});
}
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeResponse() {
return (T) this.response;
}
private static class ResponseHttpCookie implements org.eclipse.jetty.http.HttpCookie {
private final ResponseCookie responseCookie;
public ResponseHttpCookie(ResponseCookie responseCookie) {
this.responseCookie = responseCookie;
}
public ResponseCookie getResponseCookie() {
return this.responseCookie;
}
@Override
public String getName() {
return this.responseCookie.getName();
}
@Override
public String getValue() {
return this.responseCookie.getValue();
}
@Override
public int getVersion() {
return 0;
}
@Override
public long getMaxAge() {
return this.responseCookie.getMaxAge().toSeconds();
}
@Override
@Nullable
public String getComment() {
return null;
}
@Override
@Nullable
public String getDomain() {
return this.responseCookie.getDomain();
}
@Override
@Nullable
public String getPath() {
return this.responseCookie.getPath();
}
@Override
public boolean isSecure() {
return this.responseCookie.isSecure();
}
@Nullable
@Override
public SameSite getSameSite() {
// Adding non-null return site breaks tests.
return null;
}
@Override
public boolean isHttpOnly() {
return this.responseCookie.isHttpOnly();
}
@Override
public boolean isPartitioned() {
return this.responseCookie.isPartitioned();
}
@Override
public Map<String, String> getAttributes() {
return Collections.emptyMap();
}
}
}

View File

@ -17,9 +17,11 @@
package org.springframework.http.support; package org.springframework.http.support;
import java.util.AbstractSet; import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -44,6 +46,9 @@ public final class JettyHeadersAdapter implements MultiValueMap<String, String>
private final HttpFields headers; private final HttpFields headers;
@Nullable
private final HttpFields.Mutable mutable;
/** /**
* Creates a new {@code JettyHeadersAdapter} based on the given * Creates a new {@code JettyHeadersAdapter} based on the given
@ -53,6 +58,7 @@ public final class JettyHeadersAdapter implements MultiValueMap<String, String>
public JettyHeadersAdapter(HttpFields headers) { public JettyHeadersAdapter(HttpFields headers) {
Assert.notNull(headers, "Headers must not be null"); Assert.notNull(headers, "Headers must not be null");
this.headers = headers; this.headers = headers;
this.mutable = headers instanceof HttpFields.Mutable m ? m : null;
} }
@ -119,22 +125,36 @@ public final class JettyHeadersAdapter implements MultiValueMap<String, String>
@Override @Override
public boolean containsKey(Object key) { public boolean containsKey(Object key) {
return (key instanceof String headerName && this.headers.contains(headerName)); return (key instanceof String name && this.headers.contains(name));
} }
@Override @Override
public boolean containsValue(Object value) { public boolean containsValue(Object value) {
return (value instanceof String searchString && if (value instanceof String searchString) {
this.headers.stream().anyMatch(field -> field.contains(searchString))); for (HttpField field : this.headers) {
if (field.contains(searchString)) {
return true;
}
}
}
return false;
} }
@Nullable @Nullable
@Override @Override
public List<String> get(Object key) { public List<String> get(Object key) {
if (containsKey(key)) { List<String> list = null;
return this.headers.getValuesList((String) key); if (key instanceof String name) {
for (HttpField f : this.headers) {
if (f.is(name)) {
if (list == null) {
list = new ArrayList<>();
} }
return null; list.add(f.getValue());
}
}
}
return list;
} }
@Nullable @Nullable
@ -142,7 +162,21 @@ public final class JettyHeadersAdapter implements MultiValueMap<String, String>
public List<String> put(String key, List<String> value) { public List<String> put(String key, List<String> value) {
HttpFields.Mutable mutableHttpFields = mutableFields(); HttpFields.Mutable mutableHttpFields = mutableFields();
List<String> oldValues = get(key); List<String> oldValues = get(key);
mutableHttpFields.put(key, value);
if (oldValues == null) {
switch (value.size()) {
case 0 -> {}
case 1 -> mutableHttpFields.add(key, value.get(0));
default -> mutableHttpFields.add(key, value);
}
}
else {
switch (value.size()) {
case 0 -> mutableHttpFields.remove(key);
case 1 -> mutableHttpFields.put(key, value.get(0));
default -> mutableHttpFields.put(key, value);
}
}
return oldValues; return oldValues;
} }
@ -150,12 +184,20 @@ public final class JettyHeadersAdapter implements MultiValueMap<String, String>
@Override @Override
public List<String> remove(Object key) { public List<String> remove(Object key) {
HttpFields.Mutable mutableHttpFields = mutableFields(); HttpFields.Mutable mutableHttpFields = mutableFields();
List<String> list = null;
if (key instanceof String name) { if (key instanceof String name) {
List<String> oldValues = get(key); for (ListIterator<HttpField> i = mutableHttpFields.listIterator(); i.hasNext(); ) {
mutableHttpFields.remove(name); HttpField f = i.next();
return oldValues; if (f.is(name)) {
if (list == null) {
list = new ArrayList<>();
} }
return null; list.add(f.getValue());
i.remove();
}
}
}
return list;
} }
@Override @Override
@ -187,6 +229,7 @@ public final class JettyHeadersAdapter implements MultiValueMap<String, String>
public Iterator<Entry<String, List<String>>> iterator() { public Iterator<Entry<String, List<String>>> iterator() {
return new EntryIterator(); return new EntryIterator();
} }
@Override @Override
public int size() { public int size() {
return headers.size(); return headers.size();
@ -195,16 +238,12 @@ public final class JettyHeadersAdapter implements MultiValueMap<String, String>
} }
private HttpFields.Mutable mutableFields() { private HttpFields.Mutable mutableFields() {
if (this.headers instanceof HttpFields.Mutable mutableHttpFields) { if (this.mutable == null) {
return mutableHttpFields;
}
else {
throw new IllegalStateException("Immutable headers"); throw new IllegalStateException("Immutable headers");
} }
return this.mutable;
} }
@Override @Override
public String toString() { public String toString() {
return HttpHeaders.formatHeaders(this); return HttpHeaders.formatHeaders(this);

View File

@ -27,6 +27,7 @@ import org.springframework.web.client.ResponseErrorHandler;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -87,8 +88,8 @@ class ErrorHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests {
// but an application can apply CompactPathRule via RewriteHandler: // but an application can apply CompactPathRule via RewriteHandler:
// https://www.eclipse.org/jetty/documentation/jetty-11/programming_guide.php // https://www.eclipse.org/jetty/documentation/jetty-11/programming_guide.php
HttpStatus expectedStatus = HttpStatus expectedStatus = (httpServer instanceof JettyHttpServer || httpServer instanceof JettyCoreHttpServer
(httpServer instanceof JettyHttpServer ? HttpStatus.BAD_REQUEST : HttpStatus.OK); ? HttpStatus.BAD_REQUEST : HttpStatus.OK);
assertThat(response.getStatusCode()).isEqualTo(expectedStatus); assertThat(response.getStatusCode()).isEqualTo(expectedStatus);
} }

View File

@ -30,6 +30,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
@ -54,8 +55,8 @@ class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@ParameterizedHttpServerTest @ParameterizedHttpServerTest
void zeroCopy(HttpServer httpServer) throws Exception { void zeroCopy(HttpServer httpServer) throws Exception {
assumeTrue(httpServer instanceof ReactorHttpServer || httpServer instanceof UndertowHttpServer, assumeTrue(httpServer instanceof ReactorHttpServer || httpServer instanceof UndertowHttpServer
"Zero-copy does not support Servlet"); || httpServer instanceof JettyCoreHttpServer, "Zero-copy does not support Servlet");
startServer(httpServer); startServer(httpServer);

View File

@ -126,6 +126,7 @@ public abstract class AbstractHttpHandlerIntegrationTests {
static Stream<Named<HttpServer>> httpServers() { static Stream<Named<HttpServer>> httpServers() {
return Stream.of( return Stream.of(
named("Jetty", new JettyHttpServer()), named("Jetty", new JettyHttpServer()),
named("Jetty Core", new JettyCoreHttpServer()),
named("Reactor Netty", new ReactorHttpServer()), named("Reactor Netty", new ReactorHttpServer()),
named("Tomcat", new TomcatHttpServer()), named("Tomcat", new TomcatHttpServer()),
named("Undertow", new UndertowHttpServer()) named("Undertow", new UndertowHttpServer())

View File

@ -0,0 +1,98 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.testfixture.http.server.reactive.bootstrap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.server.ServerWebSocketContainer;
import org.springframework.http.server.reactive.JettyCoreHttpHandlerAdapter;
/**
* @author Rossen Stoyanchev
* @author Sam Brannen
* @author Greg Wilkins
* @since 6.2
*/
public class JettyCoreHttpServer extends AbstractHttpServer {
protected Log logger = LogFactory.getLog(getClass().getName());
private ArrayByteBufferPool byteBufferPool;
private Server jettyServer;
@Override
protected void initServer() {
if (logger.isTraceEnabled())
this.byteBufferPool = new ArrayByteBufferPool.Tracking();
this.jettyServer = new Server(null, null, byteBufferPool);
ServerConnector connector = new ServerConnector(this.jettyServer);
connector.setHost(getHost());
connector.setPort(getPort());
this.jettyServer.addConnector(connector);
this.jettyServer.setHandler(createHandlerAdapter());
ServerWebSocketContainer.ensure(jettyServer);
}
private JettyCoreHttpHandlerAdapter createHandlerAdapter() {
return new JettyCoreHttpHandlerAdapter(resolveHttpHandler());
}
@Override
protected void startInternal() throws Exception {
this.jettyServer.start();
setPort(((ServerConnector) this.jettyServer.getConnectors()[0]).getLocalPort());
}
@Override
protected void stopInternal() {
boolean wasRunning = this.jettyServer.isRunning();
try {
this.jettyServer.stop();
}
catch (Exception ex) {
// ignore
}
// TODO remove this or make debug only
if (wasRunning && this.byteBufferPool instanceof ArrayByteBufferPool.Tracking tracking) {
if (!tracking.getLeaks().isEmpty()) {
System.err.println("Leaks:\n" + tracking.dumpLeaks());
throw new IllegalStateException("LEAKS");
}
}
}
@Override
protected void resetInternal() {
try {
if (this.jettyServer.isRunning()) {
stopInternal();
}
this.jettyServer.destroy();
}
finally {
this.jettyServer = null;
}
}
}

View File

@ -54,7 +54,6 @@ public class JettyHttpServer extends AbstractHttpServer {
connector.setPort(getPort()); connector.setPort(getPort());
this.jettyServer.addConnector(connector); this.jettyServer.addConnector(connector);
this.jettyServer.setHandler(this.contextHandler); this.jettyServer.setHandler(this.contextHandler);
this.contextHandler.start();
} }
private ServletHttpHandlerAdapter createServletAdapter() { private ServletHttpHandlerAdapter createServletAdapter() {
@ -70,43 +69,25 @@ public class JettyHttpServer extends AbstractHttpServer {
@Override @Override
protected void stopInternal() throws Exception { protected void stopInternal() throws Exception {
try { try {
if (this.contextHandler.isRunning()) {
this.contextHandler.stop();
}
}
finally {
try {
if (this.jettyServer.isRunning()) {
// Do not configure a large stop timeout. For example, setting a stop timeout
// of 5000 adds an additional 1-2 seconds to the runtime of each test using
// the Jetty sever, resulting in 2-4 extra minutes of overall build time.
this.jettyServer.setStopTimeout(100);
this.jettyServer.stop(); this.jettyServer.stop();
this.jettyServer.destroy();
}
} }
catch (Exception ex) { catch (Exception ex) {
// ignore // ignore
} }
} }
}
@Override @Override
protected void resetInternal() { protected void resetInternal() {
try { try {
if (this.jettyServer.isRunning()) { if (this.jettyServer.isRunning()) {
// Do not configure a large stop timeout. For example, setting a stop timeout
// of 5000 adds an additional 1-2 seconds to the runtime of each test using
// the Jetty sever, resulting in 2-4 extra minutes of overall build time.
this.jettyServer.setStopTimeout(100);
this.jettyServer.stop(); this.jettyServer.stop();
this.jettyServer.destroy();
} }
} }
catch (Exception ex) { catch (Exception ex) {
throw new IllegalStateException(ex); throw new IllegalStateException(ex);
} }
finally { finally {
this.jettyServer.destroy();
this.jettyServer = null; this.jettyServer = null;
this.contextHandler = null; this.contextHandler = null;
} }

View File

@ -27,6 +27,8 @@ dependencies {
optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") { optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") {
exclude group: "jakarta.servlet", module: "jakarta.servlet-api" exclude group: "jakarta.servlet", module: "jakarta.servlet-api"
} }
optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-server")
optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-client")
optional("org.freemarker:freemarker") optional("org.freemarker:freemarker")
optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-reflect")
optional("org.jetbrains.kotlin:kotlin-stdlib") optional("org.jetbrains.kotlin:kotlin-stdlib")

View File

@ -22,16 +22,9 @@ import java.nio.charset.StandardCharsets;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.IntPredicate; import java.util.function.IntPredicate;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Callback; import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Frame;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.OpCode;
import org.springframework.core.io.buffer.CloseableDataBuffer; import org.springframework.core.io.buffer.CloseableDataBuffer;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
@ -44,18 +37,14 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type; import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/** /**
* Jetty {@link WebSocket @WebSocket} handler that delegates events to a * Jetty {@link org.eclipse.jetty.websocket.api.Session.Listener} handler that delegates events to a
* reactive {@link WebSocketHandler} and its session. * reactive {@link WebSocketHandler} and its session.
* *
* @author Violeta Georgieva * @author Violeta Georgieva
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
*/ */
@WebSocket public class JettyWebSocketHandlerAdapter implements Session.Listener {
public class JettyWebSocketHandlerAdapter {
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
private final WebSocketHandler delegateHandler; private final WebSocketHandler delegateHandler;
@ -74,70 +63,62 @@ public class JettyWebSocketHandlerAdapter {
this.sessionFactory = sessionFactory; this.sessionFactory = sessionFactory;
} }
@Override
@OnWebSocketOpen
public void onWebSocketOpen(Session session) { public void onWebSocketOpen(Session session) {
this.delegateSession = this.sessionFactory.apply(session); JettyWebSocketSession delegateSession = this.sessionFactory.apply(session);
this.delegateHandler.handle(this.delegateSession) this.delegateSession = delegateSession;
this.delegateHandler.handle(delegateSession)
.checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]") .checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]")
.subscribe(this.delegateSession); .subscribe(unused -> {}, delegateSession::onHandlerError, delegateSession::onHandleComplete);
} }
@OnWebSocketMessage @Override
public void onWebSocketText(String message) { public void onWebSocketText(String message) {
if (this.delegateSession != null) { Assert.state(this.delegateSession != null, "No delegate session available");
byte[] bytes = message.getBytes(StandardCharsets.UTF_8); byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = this.delegateSession.bufferFactory().wrap(bytes); DataBuffer buffer = this.delegateSession.bufferFactory().wrap(bytes);
WebSocketMessage webSocketMessage = new WebSocketMessage(Type.TEXT, buffer); WebSocketMessage webSocketMessage = new WebSocketMessage(Type.TEXT, buffer);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); this.delegateSession.handleMessage(webSocketMessage);
}
} }
@OnWebSocketMessage @Override
public void onWebSocketBinary(ByteBuffer byteBuffer, Callback callback) { public void onWebSocketBinary(ByteBuffer byteBuffer, Callback callback) {
if (this.delegateSession != null) { Assert.state(this.delegateSession != null, "No delegate session available");
DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer); DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer);
buffer = new JettyDataBuffer(buffer, callback); buffer = new JettyCallbackDataBuffer(buffer, callback);
WebSocketMessage webSocketMessage = new WebSocketMessage(Type.BINARY, buffer); WebSocketMessage webSocketMessage = new WebSocketMessage(Type.BINARY, buffer);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); this.delegateSession.handleMessage(webSocketMessage);
}
} }
@OnWebSocketFrame @Override
public void onWebSocketFrame(Frame frame, Callback callback) { public void onWebSocketPong(ByteBuffer payload) {
if (this.delegateSession != null) { Assert.state(this.delegateSession != null, "No delegate session available");
if (OpCode.PONG == frame.getOpCode()) { DataBuffer buffer = this.delegateSession.bufferFactory().wrap(BufferUtil.copy(payload));
ByteBuffer byteBuffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer);
buffer = new JettyDataBuffer(buffer, callback);
WebSocketMessage webSocketMessage = new WebSocketMessage(Type.PONG, buffer); WebSocketMessage webSocketMessage = new WebSocketMessage(Type.PONG, buffer);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); this.delegateSession.handleMessage(webSocketMessage);
}
}
} }
@OnWebSocketClose @Override
public void onWebSocketClose(int statusCode, String reason) { public void onWebSocketClose(int statusCode, String reason) {
if (this.delegateSession != null) { Assert.state(this.delegateSession != null, "No delegate session available");
this.delegateSession.handleClose(CloseStatus.create(statusCode, reason)); this.delegateSession.handleClose(CloseStatus.create(statusCode, reason));
} }
}
@OnWebSocketError @Override
public void onWebSocketError(Throwable cause) { public void onWebSocketError(Throwable cause) {
if (this.delegateSession != null) { Assert.state(this.delegateSession != null, "No delegate session available");
this.delegateSession.handleError(cause); this.delegateSession.handleError(cause);
} }
}
private static final class JettyDataBuffer implements CloseableDataBuffer { private static final class JettyCallbackDataBuffer implements CloseableDataBuffer {
private final DataBuffer delegate; private final DataBuffer delegate;
private final Callback callback; private final Callback callback;
public JettyDataBuffer(DataBuffer delegate, Callback callback) {
public JettyCallbackDataBuffer(DataBuffer delegate, Callback callback) {
Assert.notNull(delegate, "'delegate` must not be null"); Assert.notNull(delegate, "'delegate` must not be null");
Assert.notNull(callback, "Callback must not be null"); Assert.notNull(callback, "Callback must not be null");
this.delegate = delegate; this.delegate = delegate;
@ -272,13 +253,13 @@ public class JettyWebSocketHandlerAdapter {
@Deprecated @Deprecated
public DataBuffer slice(int index, int length) { public DataBuffer slice(int index, int length) {
DataBuffer delegateSlice = this.delegate.slice(index, length); DataBuffer delegateSlice = this.delegate.slice(index, length);
return new JettyDataBuffer(delegateSlice, this.callback); return new JettyCallbackDataBuffer(delegateSlice, this.callback);
} }
@Override @Override
public DataBuffer split(int index) { public DataBuffer split(int index) {
DataBuffer delegateSplit = this.delegate.split(index); DataBuffer delegateSplit = this.delegate.split(index);
return new JettyDataBuffer(delegateSplit, this.callback); return new JettyCallbackDataBuffer(delegateSplit, this.callback);
} }
@Override @Override

View File

@ -16,18 +16,26 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.websocket.api.Callback; import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.HandshakeInfo;
@ -36,13 +44,29 @@ import org.springframework.web.reactive.socket.WebSocketSession;
/** /**
* Spring {@link WebSocketSession} implementation that adapts to a Jetty * Spring {@link WebSocketSession} implementation that adapts to a Jetty
* WebSocket {@link org.eclipse.jetty.websocket.api.Session}. * WebSocket {@link Session}.
* *
* @author Violeta Georgieva * @author Violeta Georgieva
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
*/ */
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> { public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
private final Flux<WebSocketMessage> flux;
private final Sinks.One<CloseStatus> closeStatusSink = Sinks.one();
private final Lock lock = new ReentrantLock();
private long requested = 0;
private boolean awaitingMessage = false;
@Nullable
private FluxSink<WebSocketMessage> sink;
@Nullable
private final Sinks.Empty<Void> handlerCompletionSink;
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
this(session, info, factory, null); this(session, info, factory, null);
@ -51,52 +75,88 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable Sinks.Empty<Void> completionSink) { @Nullable Sinks.Empty<Void> completionSink) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionSink); super(session, ObjectUtils.getIdentityHexString(session), info, factory);
// TODO: suspend causes failures if invoked at this stage this.handlerCompletionSink = completionSink;
// suspendReceiving(); this.flux = Flux.create(emitter -> {
this.sink = emitter;
emitter.onRequest(n -> {
boolean demand = false;
this.lock.lock();
try {
this.requested = Math.addExact(this.requested, n);
if (this.requested < 0L) {
this.requested = Long.MAX_VALUE;
} }
if (!this.awaitingMessage && this.requested > 0) {
@Override if (this.requested != Long.MAX_VALUE) {
protected boolean canSuspendReceiving() { this.requested--;
// Jetty 12 TODO: research suspend functionality in Jetty 12 }
return false; this.awaitingMessage = true;
demand = true;
}
}
finally {
this.lock.unlock();
} }
@Override if (demand) {
protected void suspendReceiving() { getDelegate().demand();
}
});
});
} }
@Override void handleMessage(WebSocketMessage message) {
protected void resumeReceiving() { Assert.state(this.sink != null, "No sink available");
this.sink.next(message);
boolean demand = false;
this.lock.lock();
try {
if (!this.awaitingMessage) {
throw new IllegalStateException();
}
this.awaitingMessage = false;
if (this.requested > 0) {
if (this.requested != Long.MAX_VALUE) {
this.requested--;
}
this.awaitingMessage = true;
demand = true;
}
}
finally {
this.lock.unlock();
} }
@Override if (demand) {
protected boolean sendMessage(WebSocketMessage message) throws IOException { getDelegate().demand();
DataBuffer dataBuffer = message.getPayload();
Session session = getDelegate();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = dataBuffer.toString(StandardCharsets.UTF_8);
session.sendText(text, new SendProcessorCallback());
}
else {
if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
}
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
ByteBuffer byteBuffer = iterator.next();
switch (message.getType()) {
case BINARY -> session.sendBinary(byteBuffer, new SendProcessorCallback());
case PING -> session.sendPing(byteBuffer, new SendProcessorCallback());
case PONG -> session.sendPong(byteBuffer, new SendProcessorCallback());
default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType());
} }
} }
void handleError(Throwable ex) {
}
void handleClose(CloseStatus closeStatus) {
this.closeStatusSink.tryEmitValue(closeStatus);
if (this.sink != null) {
this.sink.complete();
} }
} }
return true;
void onHandlerError(Throwable error) {
if (JettyWebSocketSession.this.handlerCompletionSink != null) {
JettyWebSocketSession.this.handlerCompletionSink.tryEmitError(error);
}
getDelegate().close(StatusCode.SERVER_ERROR, error.getMessage(), Callback.NOOP);
}
void onHandleComplete() {
if (JettyWebSocketSession.this.handlerCompletionSink != null) {
JettyWebSocketSession.this.handlerCompletionSink.tryEmitEmpty();
}
getDelegate().close(StatusCode.NORMAL, null, Callback.NOOP);
} }
@Override @Override
@ -108,25 +168,81 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
public Mono<Void> close(CloseStatus status) { public Mono<Void> close(CloseStatus status) {
Callback.Completable callback = new Callback.Completable(); Callback.Completable callback = new Callback.Completable();
getDelegate().close(status.getCode(), status.getReason(), callback); getDelegate().close(status.getCode(), status.getReason(), callback);
return Mono.fromFuture(callback); return Mono.fromFuture(callback);
} }
private final class SendProcessorCallback implements Callback {
@Override @Override
public void fail(Throwable x) { public Mono<CloseStatus> closeStatus() {
getSendProcessor().cancel(); return this.closeStatusSink.asMono();
getSendProcessor().onError(x);
} }
@Override @Override
public void succeed() { public Flux<WebSocketMessage> receive() {
getSendProcessor().setReadyToSend(true); return this.flux;
getSendProcessor().onWritePossible();
} }
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
return Flux.from(messages)
.flatMap(this::sendMessage, 1)
.then();
} }
protected Mono<Void> sendMessage(WebSocketMessage message) {
Callback.Completable completable = new Callback.Completable();
DataBuffer dataBuffer = message.getPayload();
Session session = getDelegate();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
String text = dataBuffer.toString(StandardCharsets.UTF_8);
session.sendText(text, completable);
}
else {
switch (message.getType()) {
case BINARY -> {
@SuppressWarnings("resource")
DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers();
new IteratingCallback() {
@Override
protected Action process() {
if (!iterator.hasNext()) {
return Action.SUCCEEDED;
}
ByteBuffer buffer = iterator.next();
boolean last = iterator.hasNext();
session.sendPartialBinary(buffer, last, Callback.from(this::succeeded, this::failed));
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess() {
iterator.close();
completable.succeed();
}
@Override
protected void onCompleteFailure(Throwable cause) {
iterator.close();
completable.fail(cause);
}
}.iterate();
}
case PING -> {
// Maximum size of Control frame payload is 125, per RFC 6455.
ByteBuffer buffer = BufferUtil.allocate(125);
dataBuffer.toByteBuffer(buffer);
session.sendPing(buffer, completable);
}
case PONG -> {
// Maximum size of Control frame payload is 125, per RFC 6455.
ByteBuffer buffer = BufferUtil.allocate(125);
dataBuffer.toByteBuffer(buffer);
session.sendPong(buffer, completable);
}
default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
return Mono.fromFuture(completable);
}
} }

View File

@ -0,0 +1,111 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.client;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.JettyUpgradeListener;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import org.springframework.context.Lifecycle;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
private final org.eclipse.jetty.websocket.client.WebSocketClient client;
public JettyWebSocketClient() {
this(new org.eclipse.jetty.websocket.client.WebSocketClient());
}
public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient client) {
this.client = client;
}
@Override
public void start() {
LifeCycle.start(this.client);
}
@Override
public void stop() {
LifeCycle.stop(this.client);
}
@Override
public boolean isRunning() {
return this.client.isRunning();
}
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) {
return execute(url, null, handler);
}
@Override
public Mono<Void> execute(URI url, @Nullable HttpHeaders headers, WebSocketHandler handler) {
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols(handler.getSubProtocols());
if (headers != null) {
headers.keySet().forEach(header -> upgradeRequest.setHeader(header, headers.getValuesAsList(header)));
}
final AtomicReference<HandshakeInfo> handshakeInfo = new AtomicReference<>();
JettyUpgradeListener jettyUpgradeListener = new JettyUpgradeListener() {
@Override
public void onHandshakeResponse(Request request, Response response) {
String protocol = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL);
HttpHeaders responseHeaders = new HttpHeaders();
response.getHeaders().forEach(header -> responseHeaders.add(header.getName(), header.getValue()));
handshakeInfo.set(new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol));
}
};
Sinks.Empty<Void> completion = Sinks.empty();
JettyWebSocketHandlerAdapter handlerAdapter = new JettyWebSocketHandlerAdapter(handler, session ->
new JettyWebSocketSession(session, Objects.requireNonNull(handshakeInfo.get()), DefaultDataBufferFactory.sharedInstance, completion));
try {
this.client.connect(handlerAdapter, url, upgradeRequest, jettyUpgradeListener)
.exceptionally(throwable -> {
// Only fail the completion if we have an error
// as the JettyWebSocketSession will never be opened.
completion.tryEmitError(throwable);
return null;
});
return completion.asMono();
}
catch (IOException ex) {
return Mono.error(ex);
}
}
}

View File

@ -43,6 +43,7 @@ import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.WebSocketService; import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
@ -76,6 +77,8 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
private static final boolean jettyWsPresent; private static final boolean jettyWsPresent;
private static final boolean jettyCoreWsPresent;
private static final boolean undertowWsPresent; private static final boolean undertowWsPresent;
private static final boolean reactorNettyPresent; private static final boolean reactorNettyPresent;
@ -88,6 +91,8 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
"org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader); "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
jettyWsPresent = ClassUtils.isPresent( jettyWsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer", classLoader); "org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer", classLoader);
jettyCoreWsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.ServerWebSocketContainer", classLoader);
undertowWsPresent = ClassUtils.isPresent( undertowWsPresent = ClassUtils.isPresent(
"io.undertow.websockets.WebSocketProtocolHandshakeHandler", classLoader); "io.undertow.websockets.WebSocketProtocolHandshakeHandler", classLoader);
reactorNettyPresent = ClassUtils.isPresent( reactorNettyPresent = ClassUtils.isPresent(
@ -278,6 +283,9 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
else if (jettyWsPresent) { else if (jettyWsPresent) {
return new JettyRequestUpgradeStrategy(); return new JettyRequestUpgradeStrategy();
} }
else if (jettyCoreWsPresent) {
return new JettyCoreRequestUpgradeStrategy();
}
else if (undertowWsPresent) { else if (undertowWsPresent) {
return new UndertowRequestUpgradeStrategy(); return new UndertowRequestUpgradeStrategy();
} }

View File

@ -0,0 +1,127 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.Configurable;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketException;
import org.eclipse.jetty.websocket.server.ServerWebSocketContainer;
import org.eclipse.jetty.websocket.server.WebSocketCreator;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
/**
* A WebSocket {@code RequestUpgradeStrategy} for Jetty 12 Core.
*
* @author Rossen Stoyanchev
* @since 5.3.4
*/
public class JettyCoreRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Nullable
private Consumer<Configurable> webSocketConfigurer;
@Nullable
private ServerWebSocketContainer serverContainer;
/**
* Add a callback to configure WebSocket server parameters on
* {@link JettyWebSocketServerContainer}.
* @since 6.1
*/
public void addWebSocketConfigurer(Consumer<Configurable> webSocketConfigurer) {
this.webSocketConfigurer = (this.webSocketConfigurer != null ?
this.webSocketConfigurer.andThen(webSocketConfigurer) : webSocketConfigurer);
}
@Override
public Mono<Void> upgrade(
ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
Request jettyRequest = ServerHttpRequestDecorator.getNativeRequest(request);
Response jettyResponse = ServerHttpResponseDecorator.getNativeResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();
// Trigger WebFlux preCommit actions before upgrade
return exchange.getResponse().setComplete()
.then(Mono.deferContextual(contextView -> {
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
ContextWebSocketHandler.decorate(handler, contextView),
session -> new JettyWebSocketSession(session, handshakeInfo, factory));
WebSocketCreator webSocketCreator = (upgradeRequest, upgradeResponse, callback) -> {
if (subProtocol != null) {
upgradeResponse.setAcceptedSubProtocol(subProtocol);
}
return adapter;
};
Callback.Completable callback = new Callback.Completable();
Mono<Void> mono = Mono.fromFuture(callback);
ServerWebSocketContainer container = getWebSocketServerContainer(jettyRequest);
try {
if (!container.upgrade(webSocketCreator, jettyRequest, jettyResponse, callback)) {
throw new WebSocketException("request could not be upgraded to websocket");
}
}
catch (WebSocketException ex) {
callback.failed(ex);
}
return mono;
}));
}
private ServerWebSocketContainer getWebSocketServerContainer(Request jettyRequest) {
if (this.serverContainer == null) {
Server server = jettyRequest.getConnectionMetaData().getConnector().getServer();
ServerWebSocketContainer container = ServerWebSocketContainer.get(server.getContext());
if (this.webSocketConfigurer != null) {
this.webSocketConfigurer.accept(container);
}
this.serverContainer = container;
}
return this.serverContainer;
}
}

View File

@ -42,7 +42,7 @@ import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
/** /**
* A WebSocket {@code RequestUpgradeStrategy} for Jetty 11. * A WebSocket {@code RequestUpgradeStrategy} for Jetty 12 EE10.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.3.4 * @since 5.3.4

View File

@ -16,7 +16,12 @@
package org.springframework.web.reactive.result.method.annotation; package org.springframework.web.reactive.result.method.annotation;
import java.util.stream.Stream;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -28,10 +33,16 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Named.named;
/** /**
* Integration tests related to the use of context paths. * Integration tests related to the use of context paths.
@ -40,15 +51,25 @@ import static org.assertj.core.api.Assertions.assertThat;
*/ */
class ContextPathIntegrationTests { class ContextPathIntegrationTests {
@Test static Stream<Named<HttpServer>> httpServers() {
void multipleWebFluxApps() throws Exception { return Stream.of(
named("Jetty", new JettyHttpServer()),
named("Jetty Core", new JettyCoreHttpServer()),
named("Reactor Netty", new ReactorHttpServer()),
named("Tomcat", new TomcatHttpServer()),
named("Undertow", new UndertowHttpServer())
);
}
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("httpServers")
void multipleWebFluxApps(AbstractHttpServer server) throws Exception {
AnnotationConfigApplicationContext context1 = new AnnotationConfigApplicationContext(WebAppConfig.class); AnnotationConfigApplicationContext context1 = new AnnotationConfigApplicationContext(WebAppConfig.class);
AnnotationConfigApplicationContext context2 = new AnnotationConfigApplicationContext(WebAppConfig.class); AnnotationConfigApplicationContext context2 = new AnnotationConfigApplicationContext(WebAppConfig.class);
HttpHandler webApp1Handler = WebHttpHandlerBuilder.applicationContext(context1).build(); HttpHandler webApp1Handler = WebHttpHandlerBuilder.applicationContext(context1).build();
HttpHandler webApp2Handler = WebHttpHandlerBuilder.applicationContext(context2).build(); HttpHandler webApp2Handler = WebHttpHandlerBuilder.applicationContext(context2).build();
ReactorHttpServer server = new ReactorHttpServer();
server.registerHttpHandler("/webApp1", webApp1Handler); server.registerHttpHandler("/webApp1", webApp1Handler);
server.registerHttpHandler("/webApp2", webApp2Handler); server.registerHttpHandler("/webApp2", webApp2Handler);
server.afterPropertiesSet(); server.afterPropertiesSet();

View File

@ -53,6 +53,7 @@ import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
@ -127,7 +128,7 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@ParameterizedSseTest @ParameterizedSseTest
void sseAsEvent(HttpServer httpServer, ClientHttpConnector connector) throws Exception { void sseAsEvent(HttpServer httpServer, ClientHttpConnector connector) throws Exception {
assumeTrue(httpServer instanceof JettyHttpServer); assumeTrue(httpServer instanceof JettyHttpServer || httpServer instanceof JettyCoreHttpServer);
startServer(httpServer, connector); startServer(httpServer, connector);
@ -305,6 +306,9 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
args(new JettyHttpServer(), new ReactorClientHttpConnector()), args(new JettyHttpServer(), new ReactorClientHttpConnector()),
args(new JettyHttpServer(), new JettyClientHttpConnector()), args(new JettyHttpServer(), new JettyClientHttpConnector()),
args(new JettyHttpServer(), new HttpComponentsClientHttpConnector()), args(new JettyHttpServer(), new HttpComponentsClientHttpConnector()),
args(new JettyCoreHttpServer(), new ReactorClientHttpConnector()),
args(new JettyCoreHttpServer(), new JettyClientHttpConnector()),
args(new JettyCoreHttpServer(), new HttpComponentsClientHttpConnector()),
args(new ReactorHttpServer(), new ReactorClientHttpConnector()), args(new ReactorHttpServer(), new ReactorClientHttpConnector()),
args(new ReactorHttpServer(), new JettyClientHttpConnector()), args(new ReactorHttpServer(), new JettyClientHttpConnector()),
args(new ReactorHttpServer(), new HttpComponentsClientHttpConnector()), args(new ReactorHttpServer(), new HttpComponentsClientHttpConnector()),

View File

@ -45,6 +45,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.web.filter.reactive.ServerWebExchangeContextFilter; import org.springframework.web.filter.reactive.ServerWebExchangeContextFilter;
import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.client.JettyWebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.TomcatWebSocketClient; import org.springframework.web.reactive.socket.client.TomcatWebSocketClient;
import org.springframework.web.reactive.socket.client.UndertowWebSocketClient; import org.springframework.web.reactive.socket.client.UndertowWebSocketClient;
@ -53,6 +54,7 @@ import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.WebSocketService; import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
@ -61,6 +63,7 @@ import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpg
import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilter;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
@ -90,6 +93,7 @@ abstract class AbstractReactiveWebSocketIntegrationTests {
WebSocketClient[] clients = new WebSocketClient[] { WebSocketClient[] clients = new WebSocketClient[] {
new TomcatWebSocketClient(), new TomcatWebSocketClient(),
new JettyWebSocketClient(),
new ReactorNettyWebSocketClient(), new ReactorNettyWebSocketClient(),
new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY)) new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))
}; };
@ -97,6 +101,7 @@ abstract class AbstractReactiveWebSocketIntegrationTests {
Map<HttpServer, Class<?>> servers = new LinkedHashMap<>(); Map<HttpServer, Class<?>> servers = new LinkedHashMap<>();
servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class); servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class);
servers.put(new JettyHttpServer(), JettyConfig.class); servers.put(new JettyHttpServer(), JettyConfig.class);
servers.put(new JettyCoreHttpServer(), JettyCoreConfig.class);
servers.put(new ReactorHttpServer(), ReactorNettyConfig.class); servers.put(new ReactorHttpServer(), ReactorNettyConfig.class);
servers.put(new UndertowHttpServer(), UndertowConfig.class); servers.put(new UndertowHttpServer(), UndertowConfig.class);
@ -241,4 +246,12 @@ abstract class AbstractReactiveWebSocketIntegrationTests {
} }
} }
@Configuration
static class JettyCoreConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new JettyCoreRequestUpgradeStrategy();
}
}
} }

View File

@ -19,6 +19,7 @@ dependencies {
optional("org.eclipse.jetty.ee10:jetty-ee10-webapp") { optional("org.eclipse.jetty.ee10:jetty-ee10-webapp") {
exclude group: "jakarta.servlet", module: "jakarta.servlet-api" exclude group: "jakarta.servlet", module: "jakarta.servlet-api"
} }
optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-api")
optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jakarta-server") optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jakarta-server")
optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") { optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") {
exclude group: "jakarta.servlet", module: "jakarta.servlet-api" exclude group: "jakarta.servlet", module: "jakarta.servlet-api"

View File

@ -20,17 +20,11 @@ import java.nio.ByteBuffer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Callback; import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Frame;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.core.OpCode;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.CloseStatus;
@ -40,23 +34,22 @@ import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator; import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator;
/** /**
* Adapts {@link WebSocketHandler} to the Jetty WebSocket API. * Adapts {@link WebSocketHandler} to the Jetty WebSocket API {@link org.eclipse.jetty.websocket.api.Session.Listener}.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*/ */
@WebSocket public class JettyWebSocketHandlerAdapter implements Session.Listener {
public class JettyWebSocketHandlerAdapter {
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
private static final Log logger = LogFactory.getLog(JettyWebSocketHandlerAdapter.class); private static final Log logger = LogFactory.getLog(JettyWebSocketHandlerAdapter.class);
private final WebSocketHandler webSocketHandler; private final WebSocketHandler webSocketHandler;
private final JettyWebSocketSession wsSession; private final JettyWebSocketSession wsSession;
@Nullable
private Session nativeSession;
public JettyWebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebSocketSession wsSession) { public JettyWebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebSocketSession wsSession) {
Assert.notNull(webSocketHandler, "WebSocketHandler must not be null"); Assert.notNull(webSocketHandler, "WebSocketHandler must not be null");
@ -65,69 +58,60 @@ public class JettyWebSocketHandlerAdapter {
this.wsSession = wsSession; this.wsSession = wsSession;
} }
@Override
@OnWebSocketOpen
public void onWebSocketOpen(Session session) { public void onWebSocketOpen(Session session) {
try { try {
this.nativeSession = session;
this.wsSession.initializeNativeSession(session); this.wsSession.initializeNativeSession(session);
this.webSocketHandler.afterConnectionEstablished(this.wsSession); this.webSocketHandler.afterConnectionEstablished(this.wsSession);
this.nativeSession.demand();
} }
catch (Exception ex) { catch (Exception ex) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); tryCloseWithError(ex);
} }
} }
@OnWebSocketMessage @Override
public void onWebSocketText(String payload) { public void onWebSocketText(String payload) {
Assert.state(this.nativeSession != null, "No native session available");
TextMessage message = new TextMessage(payload); TextMessage message = new TextMessage(payload);
try { try {
this.webSocketHandler.handleMessage(this.wsSession, message); this.webSocketHandler.handleMessage(this.wsSession, message);
this.nativeSession.demand();
} }
catch (Exception ex) { catch (Exception ex) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); tryCloseWithError(ex);
} }
} }
@OnWebSocketMessage @Override
public void onWebSocketBinary(ByteBuffer payload, Callback callback) { public void onWebSocketBinary(ByteBuffer payload, Callback callback) {
BinaryMessage message = new BinaryMessage(copyByteBuffer(payload), true); Assert.state(this.nativeSession != null, "No native session available");
BinaryMessage message = new BinaryMessage(BufferUtil.copy(payload), true);
callback.succeed();
try { try {
this.webSocketHandler.handleMessage(this.wsSession, message); this.webSocketHandler.handleMessage(this.wsSession, message);
callback.succeed(); this.nativeSession.demand();
} }
catch (Exception ex) { catch (Exception ex) {
callback.fail(ex); tryCloseWithError(ex);
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
} }
} }
@OnWebSocketFrame @Override
public void onWebSocketFrame(Frame frame, Callback callback) { public void onWebSocketPong(ByteBuffer payload) {
if (OpCode.PONG == frame.getOpCode()) { Assert.state(this.nativeSession != null, "No native session available");
ByteBuffer payload = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; PongMessage message = new PongMessage(BufferUtil.copy(payload));
PongMessage message = new PongMessage(copyByteBuffer(payload));
try { try {
this.webSocketHandler.handleMessage(this.wsSession, message); this.webSocketHandler.handleMessage(this.wsSession, message);
callback.succeed(); this.nativeSession.demand();
} }
catch (Exception ex) { catch (Exception ex) {
callback.fail(ex); tryCloseWithError(ex);
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
}
}
else {
callback.succeed();
} }
} }
private static ByteBuffer copyByteBuffer(ByteBuffer src) { @Override
ByteBuffer dest = ByteBuffer.allocate(src.remaining());
dest.put(src);
dest.flip();
return dest;
}
@OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) { public void onWebSocketClose(int statusCode, String reason) {
CloseStatus closeStatus = new CloseStatus(statusCode, reason); CloseStatus closeStatus = new CloseStatus(statusCode, reason);
try { try {
@ -135,19 +119,32 @@ public class JettyWebSocketHandlerAdapter {
} }
catch (Exception ex) { catch (Exception ex) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("Unhandled exception after connection closed for " + this, ex); logger.warn("Unhandled exception from afterConnectionClosed for " + this, ex);
} }
} }
} }
@OnWebSocketError @Override
public void onWebSocketError(Throwable cause) { public void onWebSocketError(Throwable cause) {
try { try {
this.webSocketHandler.handleTransportError(this.wsSession, cause); this.webSocketHandler.handleTransportError(this.wsSession, cause);
} }
catch (Exception ex) { catch (Exception ex) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); if (logger.isWarnEnabled()) {
logger.warn("Unhandled exception from handleTransportError for " + this, ex);
}
} }
} }
private void tryCloseWithError(Throwable t) {
if (this.nativeSession != null) {
if (this.nativeSession.isOpen()) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
}
else {
// Session might be O-SHUT waiting for response close frame, so abort to close the connection.
this.nativeSession.disconnect();
}
}
}
} }

View File

@ -45,7 +45,7 @@ import org.springframework.web.socket.server.HandshakeFailureException;
import org.springframework.web.socket.server.RequestUpgradeStrategy; import org.springframework.web.socket.server.RequestUpgradeStrategy;
/** /**
* A {@link RequestUpgradeStrategy} for Jetty 11. * A {@link RequestUpgradeStrategy} for Jetty 12 EE10.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.3.4 * @since 5.3.4