From bd59c7a6911a004fa35bd2d568b13fa3ed880a72 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Wed, 19 Oct 2022 13:22:26 +0100 Subject: [PATCH] Polishing contribution Closes gh-29310 --- .../core/io/buffer/DataBufferTests.java | 9 ++--- .../codec/AbstractDecoderTests.java | 18 +++++----- .../web/reactive/socket/WebSocketMessage.java | 35 ++++++++++++++----- .../socket/WebSocketIntegrationTests.java | 2 +- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index cfa25833df..8d389fb473 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -524,7 +524,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest - @SuppressWarnings("deprecation") void asByteBufferIndexLength(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -534,12 +533,10 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { ByteBuffer result = buffer.asByteBuffer(1, 2); assertThat(result.capacity()).isEqualTo(2); - boolean isNetty5DataBufferFactory = bufferFactory instanceof Netty5DataBufferFactory; - if (isNetty5DataBufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, () -> { DataBufferUtils.release(buffer); - } - - assumeFalse(isNetty5DataBufferFactory, "Netty 5 does share the internal buffer"); + return "Netty 5 does share the internal buffer"; + }); buffer.write((byte) 'c'); assertThat(result.remaining()).isEqualTo(2); diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java index fd30341a67..97454ed162 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -208,12 +208,12 @@ public abstract class AbstractDecoderTests> extends Abstrac protected void testDecodeError(Publisher input, ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - Flux buffer = Mono.from(input).concatWith(Flux.error(new InputException())); + Flux flux = Mono.from(input).concatWith(Flux.error(new InputException())); assertThatExceptionOfType(InputException.class).isThrownBy(() -> - this.decoder.decode(buffer, outputType, mimeType, hints) - .doOnNext(o -> { - if (o instanceof Buffer buf) { - buf.close(); + this.decoder.decode(flux, outputType, mimeType, hints) + .doOnNext(object -> { + if (object instanceof Buffer buffer) { + buffer.close(); } }) .blockLast(Duration.ofSeconds(5))); @@ -234,9 +234,9 @@ public abstract class AbstractDecoderTests> extends Abstrac @Nullable MimeType mimeType, @Nullable Map hints) { Flux result = this.decoder.decode(input, outputType, mimeType, hints) - .doOnNext(o -> { - if (o instanceof Buffer buf) { - buf.close(); + .doOnNext(object -> { + if (object instanceof Buffer buffer) { + buffer.close(); } }); StepVerifier.create(result).expectNextCount(1).thenCancel().verify(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java index e351eab724..fe0c02d95a 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.Netty5DataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; /** @@ -37,6 +38,10 @@ import org.springframework.util.ObjectUtils; */ public class WebSocketMessage { + private static final boolean reactorNetty2Present = ClassUtils.isPresent( + "io.netty5.handler.codec.http.websocketx.WebSocketFrame", WebSocketMessage.class.getClassLoader()); + + private final Type type; private final DataBuffer payload; @@ -129,15 +134,11 @@ public class WebSocketMessage { * @see DataBufferUtils#retain(DataBuffer) */ public WebSocketMessage retain() { - if (!(this.nativeMessage instanceof io.netty5.handler.codec.http.websocketx.WebSocketFrame frame) ) { - DataBufferUtils.retain(this.payload); - return this; - } - else { - io.netty5.handler.codec.http.websocketx.WebSocketFrame newFrame = frame.send().receive(); - DataBuffer newPayload = ((Netty5DataBufferFactory) this.payload.factory()).wrap(newFrame.binaryData()); - return new WebSocketMessage(this.type, newPayload, newFrame); + if (reactorNetty2Present) { + return ReactorNetty2Helper.retain(this); } + DataBufferUtils.retain(this.payload); + return this; } /** @@ -199,4 +200,20 @@ public class WebSocketMessage { PONG } + + private static class ReactorNetty2Helper { + + static WebSocketMessage retain(WebSocketMessage message) { + if (message.nativeMessage instanceof io.netty5.handler.codec.http.websocketx.WebSocketFrame netty5Frame) { + io.netty5.handler.codec.http.websocketx.WebSocketFrame frame = netty5Frame.send().receive(); + DataBuffer payload = ((Netty5DataBufferFactory) message.payload.factory()).wrap(frame.binaryData()); + return new WebSocketMessage(message.type, payload, frame); + } + else { + DataBufferUtils.retain(message.payload); + return message; + } + } + } + } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index d3cffbd966..ec81cab03e 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.