parent
57bbc09fca
commit
997d1b3d7e
|
@ -84,6 +84,10 @@ configure([rootProject] + javaProjects) { project ->
|
|||
systemProperty("java.awt.headless", "true")
|
||||
systemProperty("testGroups", project.properties.get("testGroups"))
|
||||
systemProperty("io.netty.leakDetection.level", "paranoid")
|
||||
systemProperty("io.netty5.leakDetectionLevel", "paranoid")
|
||||
systemProperty("io.netty5.leakDetection.targetRecords", "32")
|
||||
systemProperty("io.netty5.buffer.lifecycleTracingEnabled", "true")
|
||||
systemProperty("io.netty5.buffer.leakDetectionEnabled", "true")
|
||||
jvmArgs(["--add-opens=java.base/java.lang=ALL-UNNAMED",
|
||||
"--add-opens=java.base/java.util=ALL-UNNAMED"])
|
||||
}
|
||||
|
|
|
@ -87,7 +87,11 @@ class Netty5BufferDecoderTests extends AbstractDecoderTests<Netty5BufferDecoder>
|
|||
}
|
||||
|
||||
private Consumer<Buffer> expectByteBuffer(Buffer expected) {
|
||||
return actual -> assertThat(actual).isEqualTo(expected);
|
||||
return actual -> {
|
||||
try (actual; expected) {
|
||||
assertThat(actual).isEqualTo(expected);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -534,8 +534,12 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
|
|||
ByteBuffer result = buffer.asByteBuffer(1, 2);
|
||||
assertThat(result.capacity()).isEqualTo(2);
|
||||
|
||||
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory,
|
||||
"Netty 5 does share the internal buffer");
|
||||
boolean isNetty5DataBufferFactory = bufferFactory instanceof Netty5DataBufferFactory;
|
||||
if (isNetty5DataBufferFactory) {
|
||||
DataBufferUtils.release(buffer);
|
||||
}
|
||||
|
||||
assumeFalse(isNetty5DataBufferFactory, "Netty 5 does share the internal buffer");
|
||||
|
||||
buffer.write((byte) 'c');
|
||||
assertThat(result.remaining()).isEqualTo(2);
|
||||
|
@ -774,6 +778,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
|
|||
|
||||
assertThat(result).isEqualTo(bytes);
|
||||
|
||||
if (bufferFactory instanceof Netty5DataBufferFactory) {
|
||||
release(slice);
|
||||
}
|
||||
release(buffer);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.time.Duration;
|
|||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import io.netty5.buffer.Buffer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -209,7 +210,13 @@ public abstract class AbstractDecoderTests<D extends Decoder<?>> extends Abstrac
|
|||
|
||||
Flux<DataBuffer> buffer = Mono.from(input).concatWith(Flux.error(new InputException()));
|
||||
assertThatExceptionOfType(InputException.class).isThrownBy(() ->
|
||||
this.decoder.decode(buffer, outputType, mimeType, hints).blockLast(Duration.ofSeconds(5)));
|
||||
this.decoder.decode(buffer, outputType, mimeType, hints)
|
||||
.doOnNext(o -> {
|
||||
if (o instanceof Buffer buf) {
|
||||
buf.close();
|
||||
}
|
||||
})
|
||||
.blockLast(Duration.ofSeconds(5)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -226,7 +233,12 @@ public abstract class AbstractDecoderTests<D extends Decoder<?>> extends Abstrac
|
|||
protected void testDecodeCancel(Publisher<DataBuffer> input, ResolvableType outputType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints);
|
||||
Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints)
|
||||
.doOnNext(o -> {
|
||||
if (o instanceof Buffer buf) {
|
||||
buf.close();
|
||||
}
|
||||
});
|
||||
StepVerifier.create(result).expectNextCount(1).thenCancel().verify();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
|
|||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
@ -128,8 +129,15 @@ public class WebSocketMessage {
|
|||
* @see DataBufferUtils#retain(DataBuffer)
|
||||
*/
|
||||
public WebSocketMessage retain() {
|
||||
DataBufferUtils.retain(this.payload);
|
||||
return this;
|
||||
if (!(this.nativeMessage instanceof io.netty5.handler.codec.http.websocketx.WebSocketFrame frame) ) {
|
||||
DataBufferUtils.retain(this.payload);
|
||||
return this;
|
||||
}
|
||||
else {
|
||||
io.netty5.handler.codec.http.websocketx.WebSocketFrame newFrame = frame.send().receive();
|
||||
DataBuffer newPayload = ((Netty5DataBufferFactory) this.payload.factory()).wrap(newFrame.binaryData());
|
||||
return new WebSocketMessage(this.type, newPayload, newFrame);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -75,9 +75,8 @@ public abstract class Netty5WebSocketSessionSupport<T> extends AbstractWebSocket
|
|||
|
||||
|
||||
protected WebSocketMessage toMessage(WebSocketFrame frame) {
|
||||
WebSocketFrame newFrame = frame.send().receive();
|
||||
DataBuffer payload = bufferFactory().wrap(newFrame.binaryData());
|
||||
return new WebSocketMessage(messageTypes.get(newFrame.getClass()), payload, newFrame);
|
||||
DataBuffer payload = bufferFactory().wrap(frame.binaryData());
|
||||
return new WebSocketMessage(messageTypes.get(frame.getClass()), payload, frame);
|
||||
}
|
||||
|
||||
protected WebSocketFrame toFrame(WebSocketMessage message) {
|
||||
|
|
|
@ -220,7 +220,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
|
|||
return Mono.deferContextual(contextView -> {
|
||||
String key = ServerWebExchangeContextFilter.EXCHANGE_CONTEXT_ATTRIBUTE;
|
||||
assertThat(contextView.getOrEmpty(key).orElse(null)).isNotNull();
|
||||
return session.send(session.receive().doOnNext(WebSocketMessage::retain));
|
||||
return session.send(session.receive().map(WebSocketMessage::retain));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue