Polishing contribution

Closes gh-29310
This commit is contained in:
rstoyanchev 2022-10-19 13:22:26 +01:00
parent 997d1b3d7e
commit bd59c7a691
4 changed files with 39 additions and 25 deletions

View File

@ -524,7 +524,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void asByteBufferIndexLength(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
@ -534,12 +533,10 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
ByteBuffer result = buffer.asByteBuffer(1, 2);
assertThat(result.capacity()).isEqualTo(2);
boolean isNetty5DataBufferFactory = bufferFactory instanceof Netty5DataBufferFactory;
if (isNetty5DataBufferFactory) {
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, () -> {
DataBufferUtils.release(buffer);
}
assumeFalse(isNetty5DataBufferFactory, "Netty 5 does share the internal buffer");
return "Netty 5 does share the internal buffer";
});
buffer.write((byte) 'c');
assertThat(result.remaining()).isEqualTo(2);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -208,12 +208,12 @@ public abstract class AbstractDecoderTests<D extends Decoder<?>> extends Abstrac
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<DataBuffer> buffer = Mono.from(input).concatWith(Flux.error(new InputException()));
Flux<DataBuffer> flux = Mono.from(input).concatWith(Flux.error(new InputException()));
assertThatExceptionOfType(InputException.class).isThrownBy(() ->
this.decoder.decode(buffer, outputType, mimeType, hints)
.doOnNext(o -> {
if (o instanceof Buffer buf) {
buf.close();
this.decoder.decode(flux, outputType, mimeType, hints)
.doOnNext(object -> {
if (object instanceof Buffer buffer) {
buffer.close();
}
})
.blockLast(Duration.ofSeconds(5)));
@ -234,9 +234,9 @@ public abstract class AbstractDecoderTests<D extends Decoder<?>> extends Abstrac
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints)
.doOnNext(o -> {
if (o instanceof Buffer buf) {
buf.close();
.doOnNext(object -> {
if (object instanceof Buffer buffer) {
buffer.close();
}
});
StepVerifier.create(result).expectNextCount(1).thenCancel().verify();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,6 +24,7 @@ import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
/**
@ -37,6 +38,10 @@ import org.springframework.util.ObjectUtils;
*/
public class WebSocketMessage {
private static final boolean reactorNetty2Present = ClassUtils.isPresent(
"io.netty5.handler.codec.http.websocketx.WebSocketFrame", WebSocketMessage.class.getClassLoader());
private final Type type;
private final DataBuffer payload;
@ -129,15 +134,11 @@ public class WebSocketMessage {
* @see DataBufferUtils#retain(DataBuffer)
*/
public WebSocketMessage retain() {
if (!(this.nativeMessage instanceof io.netty5.handler.codec.http.websocketx.WebSocketFrame frame) ) {
DataBufferUtils.retain(this.payload);
return this;
}
else {
io.netty5.handler.codec.http.websocketx.WebSocketFrame newFrame = frame.send().receive();
DataBuffer newPayload = ((Netty5DataBufferFactory) this.payload.factory()).wrap(newFrame.binaryData());
return new WebSocketMessage(this.type, newPayload, newFrame);
if (reactorNetty2Present) {
return ReactorNetty2Helper.retain(this);
}
DataBufferUtils.retain(this.payload);
return this;
}
/**
@ -199,4 +200,20 @@ public class WebSocketMessage {
PONG
}
private static class ReactorNetty2Helper {
static WebSocketMessage retain(WebSocketMessage message) {
if (message.nativeMessage instanceof io.netty5.handler.codec.http.websocketx.WebSocketFrame netty5Frame) {
io.netty5.handler.codec.http.websocketx.WebSocketFrame frame = netty5Frame.send().receive();
DataBuffer payload = ((Netty5DataBufferFactory) message.payload.factory()).wrap(frame.binaryData());
return new WebSocketMessage(message.type, payload, frame);
}
else {
DataBufferUtils.retain(message.payload);
return message;
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.