Consistent formatting and related polishing
This commit is contained in:
parent
60b72d721d
commit
17930d6c27
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -17,11 +17,12 @@
|
||||||
package org.springframework.messaging;
|
package org.springframework.messaging;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contract for handling a {@link Message}.
|
* Simple contract for handling a {@link Message}.
|
||||||
*
|
*
|
||||||
* @author Mark Fisher
|
* @author Mark Fisher
|
||||||
* @author Iwein Fuld
|
* @author Iwein Fuld
|
||||||
* @since 4.0
|
* @since 4.0
|
||||||
|
* @see ReactiveMessageHandler
|
||||||
*/
|
*/
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface MessageHandler {
|
public interface MessageHandler {
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging;
|
package org.springframework.messaging;
|
||||||
|
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
@ -22,6 +23,7 @@ import reactor.core.publisher.Mono;
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
* @since 5.2
|
* @since 5.2
|
||||||
|
* @see MessageHandler
|
||||||
*/
|
*/
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface ReactiveMessageHandler {
|
public interface ReactiveMessageHandler {
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler;
|
package org.springframework.messaging.handler;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.springframework.util.Assert;
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
* @since 5.2
|
* @since 5.2
|
||||||
*
|
|
||||||
* @see HeadersMethodArgumentResolver
|
* @see HeadersMethodArgumentResolver
|
||||||
* @see NativeMessageHeaderAccessor
|
* @see NativeMessageHeaderAccessor
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.annotation.support.reactive;
|
package org.springframework.messaging.handler.annotation.support.reactive;
|
||||||
|
|
||||||
import java.lang.reflect.AnnotatedElement;
|
import java.lang.reflect.AnnotatedElement;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.annotation.support.reactive;
|
package org.springframework.messaging.handler.annotation.support.reactive;
|
||||||
|
|
||||||
import java.lang.annotation.Annotation;
|
import java.lang.annotation.Annotation;
|
||||||
|
@ -90,7 +91,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
|
||||||
public PayloadMethodArgumentResolver(List<? extends Decoder<?>> decoders, @Nullable Validator validator,
|
public PayloadMethodArgumentResolver(List<? extends Decoder<?>> decoders, @Nullable Validator validator,
|
||||||
@Nullable ReactiveAdapterRegistry registry, boolean useDefaultResolution) {
|
@Nullable ReactiveAdapterRegistry registry, boolean useDefaultResolution) {
|
||||||
|
|
||||||
Assert.isTrue(!CollectionUtils.isEmpty(decoders), "At least one Decoder is required.");
|
Assert.isTrue(!CollectionUtils.isEmpty(decoders), "At least one Decoder is required");
|
||||||
this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders));
|
this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders));
|
||||||
this.validator = validator;
|
this.validator = validator;
|
||||||
this.adapterRegistry = registry != null ? registry : ReactiveAdapterRegistry.getSharedInstance();
|
this.adapterRegistry = registry != null ? registry : ReactiveAdapterRegistry.getSharedInstance();
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.invocation.reactive;
|
package org.springframework.messaging.handler.invocation.reactive;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.invocation.reactive;
|
package org.springframework.messaging.handler.invocation.reactive;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
@ -102,7 +103,7 @@ public abstract class AbstractMethodMessageHandler<T>
|
||||||
* Configure custom resolvers for handler method arguments.
|
* Configure custom resolvers for handler method arguments.
|
||||||
*/
|
*/
|
||||||
public void setArgumentResolverConfigurer(ArgumentResolverConfigurer configurer) {
|
public void setArgumentResolverConfigurer(ArgumentResolverConfigurer configurer) {
|
||||||
Assert.notNull(configurer, "HandlerMethodArgumentResolver is required.");
|
Assert.notNull(configurer, "HandlerMethodArgumentResolver is required");
|
||||||
this.argumentResolverConfigurer = configurer;
|
this.argumentResolverConfigurer = configurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +118,7 @@ public abstract class AbstractMethodMessageHandler<T>
|
||||||
* Configure custom return value handlers for handler metohds.
|
* Configure custom return value handlers for handler metohds.
|
||||||
*/
|
*/
|
||||||
public void setReturnValueHandlerConfigurer(ReturnValueHandlerConfigurer configurer) {
|
public void setReturnValueHandlerConfigurer(ReturnValueHandlerConfigurer configurer) {
|
||||||
Assert.notNull(configurer, "ReturnValueHandlerConfigurer is required.");
|
Assert.notNull(configurer, "ReturnValueHandlerConfigurer is required");
|
||||||
this.returnValueHandlerConfigurer = configurer;
|
this.returnValueHandlerConfigurer = configurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -450,7 +451,6 @@ public abstract class AbstractMethodMessageHandler<T>
|
||||||
* @param destination the destination
|
* @param destination the destination
|
||||||
* @param message the message
|
* @param message the message
|
||||||
*/
|
*/
|
||||||
@Nullable
|
|
||||||
protected void handleNoMatch(@Nullable String destination, Message<?> message) {
|
protected void handleNoMatch(@Nullable String destination, Message<?> message) {
|
||||||
logger.debug("No handlers for destination '" + destination + "'");
|
logger.debug("No handlers for destination '" + destination + "'");
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.invocation.reactive;
|
package org.springframework.messaging.handler.invocation.reactive;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.invocation.reactive;
|
package org.springframework.messaging.handler.invocation.reactive;
|
||||||
|
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.invocation.reactive;
|
package org.springframework.messaging.handler.invocation.reactive;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.invocation.reactive;
|
package org.springframework.messaging.handler.invocation.reactive;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.handler.invocation.reactive;
|
package org.springframework.messaging.handler.invocation.reactive;
|
||||||
|
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import io.rsocket.RSocket;
|
import io.rsocket.RSocket;
|
||||||
|
|
|
@ -43,16 +43,15 @@ public class RSocketRequesterMethodArgumentResolver implements HandlerMethodArgu
|
||||||
@Override
|
@Override
|
||||||
public boolean supportsParameter(MethodParameter parameter) {
|
public boolean supportsParameter(MethodParameter parameter) {
|
||||||
Class<?> type = parameter.getParameterType();
|
Class<?> type = parameter.getParameterType();
|
||||||
return RSocketRequester.class.equals(type) || RSocket.class.isAssignableFrom(type);
|
return (RSocketRequester.class.equals(type) || RSocket.class.isAssignableFrom(type));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) {
|
public Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) {
|
||||||
|
|
||||||
Object headerValue = message.getHeaders().get(RSOCKET_REQUESTER_HEADER);
|
Object headerValue = message.getHeaders().get(RSOCKET_REQUESTER_HEADER);
|
||||||
Assert.notNull(headerValue, "Missing '" + RSOCKET_REQUESTER_HEADER + "'");
|
Assert.notNull(headerValue, "Missing '" + RSOCKET_REQUESTER_HEADER + "'");
|
||||||
Assert.isInstanceOf(RSocketRequester.class, headerValue, "Expected header value of type RSocketRequester");
|
|
||||||
|
|
||||||
|
Assert.isInstanceOf(RSocketRequester.class, headerValue, "Expected header value of type RSocketRequester");
|
||||||
RSocketRequester requester = (RSocketRequester) headerValue;
|
RSocketRequester requester = (RSocketRequester) headerValue;
|
||||||
|
|
||||||
Class<?> type = parameter.getParameterType();
|
Class<?> type = parameter.getParameterType();
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -212,7 +213,6 @@ public class DefaultRSocketRequesterTests {
|
||||||
@Nullable private volatile Payload savedPayload;
|
@Nullable private volatile Payload savedPayload;
|
||||||
@Nullable private volatile Flux<Payload> savedPayloadFlux;
|
@Nullable private volatile Flux<Payload> savedPayloadFlux;
|
||||||
|
|
||||||
|
|
||||||
void setPayloadMonoToReturn(Mono<Payload> payloadMonoToReturn) {
|
void setPayloadMonoToReturn(Mono<Payload> payloadMonoToReturn) {
|
||||||
this.payloadMonoToReturn = payloadMonoToReturn;
|
this.payloadMonoToReturn = payloadMonoToReturn;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -59,12 +60,10 @@ class FireAndForgetCountingInterceptor extends AbstractRSocket implements RSocke
|
||||||
|
|
||||||
private final AtomicInteger fireAndForget = new AtomicInteger(0);
|
private final AtomicInteger fireAndForget = new AtomicInteger(0);
|
||||||
|
|
||||||
|
|
||||||
CountingDecorator(RSocket delegate) {
|
CountingDecorator(RSocket delegate) {
|
||||||
super(delegate);
|
super(delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public int getFireAndForgetCount() {
|
public int getFireAndForgetCount() {
|
||||||
return this.fireAndForget.get();
|
return this.fireAndForget.get();
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -85,7 +86,6 @@ public class RSocketBufferLeakTests {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
@SuppressWarnings("ConstantConditions")
|
@SuppressWarnings("ConstantConditions")
|
||||||
public static void setupOnce() {
|
public static void setupOnce() {
|
||||||
|
|
||||||
context = new AnnotationConfigApplicationContext(ServerConfig.class);
|
context = new AnnotationConfigApplicationContext(ServerConfig.class);
|
||||||
|
|
||||||
server = RSocketFactory.receive()
|
server = RSocketFactory.receive()
|
||||||
|
@ -246,12 +246,10 @@ public class RSocketBufferLeakTests {
|
||||||
|
|
||||||
private final List<DataBufferLeakInfo> created = new ArrayList<>();
|
private final List<DataBufferLeakInfo> created = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
LeakAwareNettyDataBufferFactory(ByteBufAllocator byteBufAllocator) {
|
LeakAwareNettyDataBufferFactory(ByteBufAllocator byteBufAllocator) {
|
||||||
super(byteBufAllocator);
|
super(byteBufAllocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void checkForLeaks(Duration duration) throws InterruptedException {
|
void checkForLeaks(Duration duration) throws InterruptedException {
|
||||||
Instant start = Instant.now();
|
Instant start = Instant.now();
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -316,7 +314,6 @@ public class RSocketBufferLeakTests {
|
||||||
|
|
||||||
private final AssertionError error;
|
private final AssertionError error;
|
||||||
|
|
||||||
|
|
||||||
DataBufferLeakInfo(DataBuffer dataBuffer, AssertionError error) {
|
DataBufferLeakInfo(DataBuffer dataBuffer, AssertionError error) {
|
||||||
this.dataBuffer = dataBuffer;
|
this.dataBuffer = dataBuffer;
|
||||||
this.error = error;
|
this.error = error;
|
||||||
|
@ -340,7 +337,6 @@ public class RSocketBufferLeakTests {
|
||||||
|
|
||||||
private final List<PayloadSavingDecorator> rsockets = new CopyOnWriteArrayList<>();
|
private final List<PayloadSavingDecorator> rsockets = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
void checkForLeaks() {
|
void checkForLeaks() {
|
||||||
this.rsockets.stream().map(PayloadSavingDecorator::getPayloads)
|
this.rsockets.stream().map(PayloadSavingDecorator::getPayloads)
|
||||||
.forEach(payloadInfoProcessor -> {
|
.forEach(payloadInfoProcessor -> {
|
||||||
|
@ -377,7 +373,6 @@ public class RSocketBufferLeakTests {
|
||||||
this.rsockets.forEach(PayloadSavingDecorator::reset);
|
this.rsockets.forEach(PayloadSavingDecorator::reset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RSocket apply(RSocket rsocket) {
|
public RSocket apply(RSocket rsocket) {
|
||||||
PayloadSavingDecorator decorator = new PayloadSavingDecorator(rsocket);
|
PayloadSavingDecorator decorator = new PayloadSavingDecorator(rsocket);
|
||||||
|
@ -392,12 +387,10 @@ public class RSocketBufferLeakTests {
|
||||||
|
|
||||||
private ReplayProcessor<PayloadLeakInfo> payloads = ReplayProcessor.create();
|
private ReplayProcessor<PayloadLeakInfo> payloads = ReplayProcessor.create();
|
||||||
|
|
||||||
|
|
||||||
PayloadSavingDecorator(RSocket delegate) {
|
PayloadSavingDecorator(RSocket delegate) {
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ReplayProcessor<PayloadLeakInfo> getPayloads() {
|
ReplayProcessor<PayloadLeakInfo> getPayloads() {
|
||||||
return this.payloads;
|
return this.payloads;
|
||||||
}
|
}
|
||||||
|
@ -447,13 +440,11 @@ public class RSocketBufferLeakTests {
|
||||||
|
|
||||||
private final ReferenceCounted referenceCounted;
|
private final ReferenceCounted referenceCounted;
|
||||||
|
|
||||||
|
|
||||||
PayloadLeakInfo(io.rsocket.Payload payload) {
|
PayloadLeakInfo(io.rsocket.Payload payload) {
|
||||||
this.description = payload.toString();
|
this.description = payload.toString();
|
||||||
this.referenceCounted = payload;
|
this.referenceCounted = payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int getReferenceCount() {
|
int getReferenceCount() {
|
||||||
return this.referenceCounted.refCnt();
|
return this.referenceCounted.refCnt();
|
||||||
}
|
}
|
||||||
|
@ -463,4 +454,5 @@ public class RSocketBufferLeakTests {
|
||||||
return this.description;
|
return this.description;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -66,7 +67,6 @@ public class RSocketClientToServerIntegrationTests {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
@SuppressWarnings("ConstantConditions")
|
@SuppressWarnings("ConstantConditions")
|
||||||
public static void setupOnce() {
|
public static void setupOnce() {
|
||||||
|
|
||||||
context = new AnnotationConfigApplicationContext(ServerConfig.class);
|
context = new AnnotationConfigApplicationContext(ServerConfig.class);
|
||||||
|
|
||||||
server = RSocketFactory.receive()
|
server = RSocketFactory.receive()
|
||||||
|
@ -97,7 +97,6 @@ public class RSocketClientToServerIntegrationTests {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void fireAndForget() {
|
public void fireAndForget() {
|
||||||
|
|
||||||
Flux.range(1, 3)
|
Flux.range(1, 3)
|
||||||
.concatMap(i -> requester.route("receive").data("Hello " + i).send())
|
.concatMap(i -> requester.route("receive").data("Hello " + i).send())
|
||||||
.blockLast();
|
.blockLast();
|
||||||
|
@ -191,7 +190,6 @@ public class RSocketClientToServerIntegrationTests {
|
||||||
|
|
||||||
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
|
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
|
||||||
|
|
||||||
|
|
||||||
@MessageMapping("receive")
|
@MessageMapping("receive")
|
||||||
void receive(String payload) {
|
void receive(String payload) {
|
||||||
this.fireForgetPayloads.onNext(payload);
|
this.fireForgetPayloads.onNext(payload);
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.messaging.rsocket;
|
package org.springframework.messaging.rsocket;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -60,7 +61,6 @@ public class RSocketServerToClientIntegrationTests {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
@SuppressWarnings("ConstantConditions")
|
@SuppressWarnings("ConstantConditions")
|
||||||
public static void setupOnce() {
|
public static void setupOnce() {
|
||||||
|
|
||||||
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
|
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
|
||||||
|
|
||||||
server = RSocketFactory.receive()
|
server = RSocketFactory.receive()
|
||||||
|
@ -215,7 +215,6 @@ public class RSocketServerToClientIntegrationTests {
|
||||||
|
|
||||||
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
|
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
|
||||||
|
|
||||||
|
|
||||||
@MessageMapping("receive")
|
@MessageMapping("receive")
|
||||||
void receive(String payload) {
|
void receive(String payload) {
|
||||||
this.fireForgetPayloads.onNext(payload);
|
this.fireForgetPayloads.onNext(payload);
|
||||||
|
|
|
@ -34,7 +34,6 @@ import java.util.Base64;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
Loading…
Reference in New Issue