Add Reactor Netty WebSocketClient support
Issue: SPR-14527
This commit is contained in:
parent
14068d5274
commit
1243556047
|
|
@ -19,9 +19,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.ipc.netty.NettyInbound;
|
||||||
|
import reactor.ipc.netty.NettyOutbound;
|
||||||
import reactor.ipc.netty.NettyPipeline;
|
import reactor.ipc.netty.NettyPipeline;
|
||||||
import reactor.ipc.netty.http.websocket.WebsocketInbound;
|
|
||||||
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
|
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.web.reactive.socket.CloseStatus;
|
import org.springframework.web.reactive.socket.CloseStatus;
|
||||||
|
|
@ -41,7 +41,7 @@ public class ReactorNettyWebSocketSession
|
||||||
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
|
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
|
||||||
|
|
||||||
|
|
||||||
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
|
public ReactorNettyWebSocketSession(NettyInbound inbound, NettyOutbound outbound,
|
||||||
HandshakeInfo handshakeInfo, NettyDataBufferFactory bufferFactory) {
|
HandshakeInfo handshakeInfo, NettyDataBufferFactory bufferFactory) {
|
||||||
|
|
||||||
super(new WebSocketConnection(inbound, outbound), handshakeInfo, bufferFactory);
|
super(new WebSocketConnection(inbound, outbound), handshakeInfo, bufferFactory);
|
||||||
|
|
@ -50,14 +50,14 @@ public class ReactorNettyWebSocketSession
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<WebSocketMessage> receive() {
|
public Flux<WebSocketMessage> receive() {
|
||||||
WebsocketInbound inbound = getDelegate().getWebsocketInbound();
|
NettyInbound inbound = getDelegate().getInbound();
|
||||||
return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class));
|
return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
|
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
|
||||||
Flux<WebSocketFrame> frameFlux = Flux.from(messages).map(this::toFrame);
|
Flux<WebSocketFrame> frameFlux = Flux.from(messages).map(this::toFrame);
|
||||||
WebsocketOutbound outbound = getDelegate().getWebsocketOutbound();
|
NettyOutbound outbound = getDelegate().getOutbound();
|
||||||
return outbound.options(NettyPipeline.SendOptions::flushOnEach)
|
return outbound.options(NettyPipeline.SendOptions::flushOnEach)
|
||||||
.sendObject(frameFlux)
|
.sendObject(frameFlux)
|
||||||
.then();
|
.then();
|
||||||
|
|
@ -72,24 +72,25 @@ public class ReactorNettyWebSocketSession
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple container for {@link WebsocketInbound} and {@link WebsocketOutbound}.
|
* Simple container for {@link NettyInbound} and {@link NettyOutbound}.
|
||||||
*/
|
*/
|
||||||
public static class WebSocketConnection {
|
public static class WebSocketConnection {
|
||||||
|
|
||||||
private final WebsocketInbound inbound;
|
private final NettyInbound inbound;
|
||||||
|
|
||||||
private final WebsocketOutbound outbound;
|
private final NettyOutbound outbound;
|
||||||
|
|
||||||
public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) {
|
|
||||||
|
public WebSocketConnection(NettyInbound inbound, NettyOutbound outbound) {
|
||||||
this.inbound = inbound;
|
this.inbound = inbound;
|
||||||
this.outbound = outbound;
|
this.outbound = outbound;
|
||||||
}
|
}
|
||||||
|
|
||||||
public WebsocketInbound getWebsocketInbound() {
|
public NettyInbound getInbound() {
|
||||||
return this.inbound;
|
return this.inbound;
|
||||||
}
|
}
|
||||||
|
|
||||||
public WebsocketOutbound getWebsocketOutbound() {
|
public NettyOutbound getOutbound() {
|
||||||
return this.outbound;
|
return this.outbound;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,90 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2016 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.springframework.web.reactive.socket.client;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.ipc.netty.NettyOutbound;
|
||||||
|
import reactor.ipc.netty.http.client.HttpClient;
|
||||||
|
import reactor.ipc.netty.http.client.HttpClientOptions;
|
||||||
|
import reactor.ipc.netty.http.client.HttpClientRequest;
|
||||||
|
|
||||||
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.web.reactive.socket.HandshakeInfo;
|
||||||
|
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||||
|
import org.springframework.web.reactive.socket.WebSocketSession;
|
||||||
|
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link WebSocketClient} based on Reactor Netty.
|
||||||
|
*
|
||||||
|
* @author Rossen Stoyanchev
|
||||||
|
* @since 5.0
|
||||||
|
*/
|
||||||
|
public class ReactorNettyWebSocketClient implements WebSocketClient {
|
||||||
|
|
||||||
|
private final HttpClient httpClient;
|
||||||
|
|
||||||
|
|
||||||
|
public ReactorNettyWebSocketClient() {
|
||||||
|
this.httpClient = HttpClient.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReactorNettyWebSocketClient(Consumer<? super HttpClientOptions> clientOptions) {
|
||||||
|
this.httpClient = HttpClient.create(clientOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Void> execute(URI url, WebSocketHandler handler) {
|
||||||
|
return execute(url, new HttpHeaders(), handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
|
||||||
|
|
||||||
|
// We have to store the NettyOutbound fow now..
|
||||||
|
// The alternative HttpClientResponse#receiveWebSocket does not work at present
|
||||||
|
AtomicReference<NettyOutbound> outboundRef = new AtomicReference<>();
|
||||||
|
|
||||||
|
return this.httpClient
|
||||||
|
.get(url.toString(), request -> {
|
||||||
|
addHeaders(request, headers);
|
||||||
|
NettyOutbound outbound = request.sendWebsocket();
|
||||||
|
outboundRef.set(outbound);
|
||||||
|
return outbound;
|
||||||
|
})
|
||||||
|
.then(inbound -> {
|
||||||
|
ByteBufAllocator allocator = inbound.channel().alloc();
|
||||||
|
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
|
||||||
|
NettyOutbound outbound = outboundRef.get();
|
||||||
|
HandshakeInfo info = new HandshakeInfo(url, headers, Mono.empty());
|
||||||
|
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
|
||||||
|
return handler.handle(session);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addHeaders(HttpClientRequest request, HttpHeaders headers) {
|
||||||
|
headers.entrySet().stream()
|
||||||
|
.forEach(e -> request.requestHeaders().set(e.getKey(), e.getValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
*/
|
*/
|
||||||
package org.springframework.web.reactive.socket.server;
|
package org.springframework.web.reactive.socket.server;
|
||||||
|
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
@ -29,7 +30,9 @@ import org.springframework.web.reactive.HandlerMapping;
|
||||||
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
|
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
|
||||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||||
import org.springframework.web.reactive.socket.WebSocketSession;
|
import org.springframework.web.reactive.socket.WebSocketSession;
|
||||||
|
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
|
||||||
import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient;
|
import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient;
|
||||||
|
import org.springframework.web.reactive.socket.client.WebSocketClient;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
|
@ -48,13 +51,22 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void echo() throws Exception {
|
public void echoReactorNettyClient() throws Exception {
|
||||||
|
testEcho(new ReactorNettyWebSocketClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void echoRxNettyClient() throws Exception {
|
||||||
|
testEcho(new RxNettyWebSocketClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testEcho(WebSocketClient client) throws URISyntaxException {
|
||||||
int count = 100;
|
int count = 100;
|
||||||
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
|
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
|
||||||
ReplayProcessor<Object> emitter = ReplayProcessor.create(count);
|
ReplayProcessor<Object> output = ReplayProcessor.create(count);
|
||||||
|
|
||||||
new RxNettyWebSocketClient()
|
client.execute(getUrl("/echo"),
|
||||||
.execute(getUrl("/echo"), session -> session
|
session -> session
|
||||||
.send(input.map(session::textMessage))
|
.send(input.map(session::textMessage))
|
||||||
.thenMany(session.receive()
|
.thenMany(session.receive()
|
||||||
.take(count)
|
.take(count)
|
||||||
|
|
@ -63,11 +75,11 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
|
||||||
message.release();
|
message.release();
|
||||||
return text;
|
return text;
|
||||||
}))
|
}))
|
||||||
.subscribeWith(emitter)
|
.subscribeWith(output)
|
||||||
.then())
|
.then())
|
||||||
.blockMillis(5000);
|
.blockMillis(5000);
|
||||||
|
|
||||||
assertEquals(input.collectList().blockMillis(5000), emitter.collectList().blockMillis(5000));
|
assertEquals(input.collectList().blockMillis(5000), output.collectList().blockMillis(5000));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue