diff --git a/build.gradle b/build.gradle index e5fcb7d86b..c01ca46e3d 100644 --- a/build.gradle +++ b/build.gradle @@ -84,6 +84,10 @@ configure([rootProject] + javaProjects) { project -> systemProperty("java.awt.headless", "true") systemProperty("testGroups", project.properties.get("testGroups")) systemProperty("io.netty.leakDetection.level", "paranoid") + systemProperty("io.netty5.leakDetectionLevel", "paranoid") + systemProperty("io.netty5.leakDetection.targetRecords", "32") + systemProperty("io.netty5.buffer.lifecycleTracingEnabled", "true") + systemProperty("io.netty5.buffer.leakDetectionEnabled", "true") jvmArgs(["--add-opens=java.base/java.lang=ALL-UNNAMED", "--add-opens=java.base/java.util=ALL-UNNAMED"]) } diff --git a/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java index c2b4eb3243..2260d74ecf 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java @@ -87,7 +87,11 @@ class Netty5BufferDecoderTests extends AbstractDecoderTests } private Consumer expectByteBuffer(Buffer expected) { - return actual -> assertThat(actual).isEqualTo(expected); + return actual -> { + try (actual; expected) { + assertThat(actual).isEqualTo(expected); + } + }; } } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index db2e6991cf..cfa25833df 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -534,8 +534,12 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { ByteBuffer result = buffer.asByteBuffer(1, 2); assertThat(result.capacity()).isEqualTo(2); - assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, - "Netty 5 does share the internal buffer"); + boolean isNetty5DataBufferFactory = bufferFactory instanceof Netty5DataBufferFactory; + if (isNetty5DataBufferFactory) { + DataBufferUtils.release(buffer); + } + + assumeFalse(isNetty5DataBufferFactory, "Netty 5 does share the internal buffer"); buffer.write((byte) 'c'); assertThat(result.remaining()).isEqualTo(2); @@ -774,6 +778,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { assertThat(result).isEqualTo(bytes); + if (bufferFactory instanceof Netty5DataBufferFactory) { + release(slice); + } release(buffer); } diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java index cbd00f32f6..fd30341a67 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.Map; import java.util.function.Consumer; +import io.netty5.buffer.Buffer; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -209,7 +210,13 @@ public abstract class AbstractDecoderTests> extends Abstrac Flux buffer = Mono.from(input).concatWith(Flux.error(new InputException())); assertThatExceptionOfType(InputException.class).isThrownBy(() -> - this.decoder.decode(buffer, outputType, mimeType, hints).blockLast(Duration.ofSeconds(5))); + this.decoder.decode(buffer, outputType, mimeType, hints) + .doOnNext(o -> { + if (o instanceof Buffer buf) { + buf.close(); + } + }) + .blockLast(Duration.ofSeconds(5))); } /** @@ -226,7 +233,12 @@ public abstract class AbstractDecoderTests> extends Abstrac protected void testDecodeCancel(Publisher input, ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - Flux result = this.decoder.decode(input, outputType, mimeType, hints); + Flux result = this.decoder.decode(input, outputType, mimeType, hints) + .doOnNext(o -> { + if (o instanceof Buffer buf) { + buf.close(); + } + }); StepVerifier.create(result).expectNextCount(1).thenCancel().verify(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java index b7d4de6937..e351eab724 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -128,8 +129,15 @@ public class WebSocketMessage { * @see DataBufferUtils#retain(DataBuffer) */ public WebSocketMessage retain() { - DataBufferUtils.retain(this.payload); - return this; + if (!(this.nativeMessage instanceof io.netty5.handler.codec.http.websocketx.WebSocketFrame frame) ) { + DataBufferUtils.retain(this.payload); + return this; + } + else { + io.netty5.handler.codec.http.websocketx.WebSocketFrame newFrame = frame.send().receive(); + DataBuffer newPayload = ((Netty5DataBufferFactory) this.payload.factory()).wrap(newFrame.binaryData()); + return new WebSocketMessage(this.type, newPayload, newFrame); + } } /** diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Netty5WebSocketSessionSupport.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Netty5WebSocketSessionSupport.java index 0107013567..8fcfff5eda 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Netty5WebSocketSessionSupport.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Netty5WebSocketSessionSupport.java @@ -75,9 +75,8 @@ public abstract class Netty5WebSocketSessionSupport extends AbstractWebSocket protected WebSocketMessage toMessage(WebSocketFrame frame) { - WebSocketFrame newFrame = frame.send().receive(); - DataBuffer payload = bufferFactory().wrap(newFrame.binaryData()); - return new WebSocketMessage(messageTypes.get(newFrame.getClass()), payload, newFrame); + DataBuffer payload = bufferFactory().wrap(frame.binaryData()); + return new WebSocketMessage(messageTypes.get(frame.getClass()), payload, frame); } protected WebSocketFrame toFrame(WebSocketMessage message) { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index 2b0380aea9..d3cffbd966 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -220,7 +220,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { return Mono.deferContextual(contextView -> { String key = ServerWebExchangeContextFilter.EXCHANGE_CONTEXT_ATTRIBUTE; assertThat(contextView.getOrEmpty(key).orElse(null)).isNotNull(); - return session.send(session.receive().doOnNext(WebSocketMessage::retain)); + return session.send(session.receive().map(WebSocketMessage::retain)); }); } }