Add metadataPush support to RSocketRequester

Closes gh-24322
This commit is contained in:
Rossen Stoyanchev 2020-05-14 13:54:45 +01:00
parent de378599d3
commit b1224835be
5 changed files with 116 additions and 84 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -248,6 +248,11 @@ final class DefaultRSocketRequester implements RSocketRequester {
.doOnDiscard(Payload.class, Payload::release);
}
@Override
public Mono<Void> sendMetadata() {
return getPayloadMono().flatMap(rsocket::metadataPush);
}
@Override
public Mono<Void> send() {
return getPayloadMono().flatMap(rsocket::fireAndForget);

View File

@ -276,6 +276,12 @@ public interface RSocketRequester {
*/
RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);
/**
* Perform a {@link RSocket#metadataPush(Payload) metadataPush}.
* @since 5.3
*/
Mono<Void> sendMetadata();
/**
* Provide payload data for the request. This can be one of:
* <ul>
@ -344,7 +350,12 @@ public interface RSocketRequester {
interface RetrieveSpec {
/**
* Perform a {@link RSocket#fireAndForget fireAndForget}.
* Perform a {@link RSocket#fireAndForget fireAndForget} sending the
* provided data and metadata.
* @return a completion that indicates if the payload was sent
* successfully or not. Note, however that is a one-way send and there
* is no indication of whether or how the even was handled on the
* remote end.
*/
Mono<Void> send();

View File

@ -1,77 +0,0 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.util.RSocketProxy;
import reactor.core.publisher.Mono;
/**
* Intercept received RSockets and count successfully completed requests seen
* on the server side. This is useful for verifying fire-and-forget
* interactions.
*
* @author Rossen Stoyanchev
*/
class FireAndForgetCountingInterceptor extends AbstractRSocket implements RSocketInterceptor {
private final List<CountingDecorator> rsockets = new CopyOnWriteArrayList<>();
public int getRSocketCount() {
return this.rsockets.size();
}
public int getFireAndForgetCount(int index) {
return this.rsockets.get(index).getFireAndForgetCount();
}
@Override
public RSocket apply(RSocket rsocket) {
CountingDecorator decorator = new CountingDecorator(rsocket);
this.rsockets.add(decorator);
return decorator;
}
private static class CountingDecorator extends RSocketProxy {
private final AtomicInteger fireAndForget = new AtomicInteger(0);
CountingDecorator(RSocket delegate) {
super(delegate);
}
public int getFireAndForgetCount() {
return this.fireAndForget.get();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
return super.fireAndForget(payload).doOnSuccess(aVoid -> this.fireAndForget.incrementAndGet());
}
}
}

View File

@ -17,16 +17,21 @@
package org.springframework.messaging.rsocket;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
@ -35,10 +40,13 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.stereotype.Controller;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@ -51,11 +59,14 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class RSocketClientToServerIntegrationTests {
private static final MimeType FOO_MIME_TYPE = MimeTypeUtils.parseMimeType("messaging/x.foo");
private static AnnotationConfigApplicationContext context;
private static CloseableChannel server;
private static FireAndForgetCountingInterceptor interceptor = new FireAndForgetCountingInterceptor();
private static CountingInterceptor interceptor = new CountingInterceptor();
private static RSocketRequester requester;
@ -65,7 +76,7 @@ public class RSocketClientToServerIntegrationTests {
public static void setupOnce() {
MimeType metadataMimeType = MimeTypeUtils.parseMimeType(
WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString());
WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
@ -105,8 +116,7 @@ public class RSocketClientToServerIntegrationTests {
.thenCancel()
.verify(Duration.ofSeconds(5));
assertThat(interceptor.getRSocketCount()).isEqualTo(1);
assertThat(interceptor.getFireAndForgetCount(0))
assertThat(interceptor.getFireAndForgetCount())
.as("Fire and forget requests did not actually complete handling on the server side")
.isEqualTo(3);
}
@ -155,6 +165,24 @@ public class RSocketClientToServerIntegrationTests {
.verify(Duration.ofSeconds(5));
}
@Test
public void metadataPush() {
Flux.just("bar", "baz")
.concatMap(s -> requester.route("foo-updates").metadata(s, FOO_MIME_TYPE).sendMetadata())
.blockLast();
StepVerifier.create(context.getBean(ServerController.class).metadataPushPayloads)
.expectNext("bar")
.expectNext("baz")
.thenAwait(Duration.ofMillis(50))
.thenCancel()
.verify(Duration.ofSeconds(5));
assertThat(interceptor.getMetadataPushCount())
.as("Metadata pushes did not actually complete handling on the server side")
.isEqualTo(2);
}
@Test
public void voidReturnValue() {
Mono<String> result = requester.route("void-return-value").data("Hello").retrieveMono(String.class);
@ -199,6 +227,9 @@ public class RSocketClientToServerIntegrationTests {
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
final ReplayProcessor<String> metadataPushPayloads = ReplayProcessor.create();
@MessageMapping("receive")
void receive(String payload) {
this.fireForgetPayloads.onNext(payload);
@ -241,6 +272,11 @@ public class RSocketClientToServerIntegrationTests {
Mono.error(new IllegalStateException("bad"));
}
@ConnectMapping("foo-updates")
public void handleMetadata(@Header("foo") String foo) {
this.metadataPushPayloads.onNext(foo);
}
@MessageExceptionHandler
Mono<String> handleException(IllegalArgumentException ex) {
return Mono.delay(Duration.ofMillis(10)).map(aLong -> ex.getMessage() + " handled");
@ -270,8 +306,63 @@ public class RSocketClientToServerIntegrationTests {
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.create();
return RSocketStrategies.builder()
.metadataExtractorRegistry(registry ->
registry.metadataToExtract(FOO_MIME_TYPE, String.class, "foo"))
.build();
}
}
private static class CountingInterceptor implements RSocket, RSocketInterceptor {
private RSocket delegate;
private final AtomicInteger fireAndForgetCount = new AtomicInteger(0);
private final AtomicInteger metadataPushCount = new AtomicInteger(0);
public int getFireAndForgetCount() {
return this.fireAndForgetCount.get();
}
public int getMetadataPushCount() {
return this.metadataPushCount.get();
}
@Override
public RSocket apply(RSocket rsocket) {
Assert.isNull(this.delegate, "Unexpected RSocket connection");
this.delegate = rsocket;
return this;
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
return this.delegate.fireAndForget(payload)
.doOnSuccess(aVoid -> this.fireAndForgetCount.incrementAndGet());
}
@Override
public Mono<Void> metadataPush(Payload payload) {
return this.delegate.metadataPush(payload)
.doOnSuccess(aVoid -> this.metadataPushCount.incrementAndGet());
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return this.delegate.requestResponse(payload);
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return this.delegate.requestStream(payload);
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return this.delegate.requestChannel(payloads);
}
}
}

View File

@ -582,6 +582,8 @@ values are supported by a registered `Encoder`. For example:
For `Fire-and-Forget` use the `send()` method that returns `Mono<Void>`. Note that the `Mono`
indicates only that the message was successfully sent, and not that it was handled.
For `Metadata-Push` use the `sendMetadata()` method with a `Mono<Void>` return value.
[[rsocket-annot-responders]]