parent
fbe7ddb640
commit
54db496815
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -145,7 +145,7 @@ public class PropertyValue extends BeanMetadataAttributeAccessor implements Seri
|
|||
}
|
||||
|
||||
/**
|
||||
* Reeurn whether this is an optional value, that is, to be ignored
|
||||
* Return whether this is an optional value, that is, to be ignored
|
||||
* when no corresponding property exists on the target class.
|
||||
* @since 3.0
|
||||
*/
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -18,14 +18,14 @@ package org.springframework.messaging.simp.stomp;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
import reactor.fn.Consumer;
|
||||
import reactor.fn.Function;
|
||||
import reactor.io.buffer.Buffer;
|
||||
import reactor.io.codec.Codec;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages.
|
||||
*
|
||||
|
@ -35,25 +35,23 @@ import reactor.io.codec.Codec;
|
|||
*/
|
||||
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
|
||||
|
||||
private final StompDecoder stompDecoder;
|
||||
|
||||
private final StompEncoder stompEncoder;
|
||||
|
||||
private final Function<Message<byte[]>, Buffer> encodingFunction;
|
||||
|
||||
private final StompDecoder stompDecoder;
|
||||
|
||||
|
||||
public Reactor2StompCodec() {
|
||||
this(new StompEncoder(), new StompDecoder());
|
||||
}
|
||||
|
||||
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) {
|
||||
Assert.notNull(encoder, "'encoder' is required");
|
||||
Assert.notNull(decoder, "'decoder' is required");
|
||||
this.stompEncoder = encoder;
|
||||
Assert.notNull(encoder, "StompEncoder is required");
|
||||
Assert.notNull(decoder, "StompDecoder is required");
|
||||
this.encodingFunction = new EncodingFunction(encoder);
|
||||
this.stompDecoder = decoder;
|
||||
this.encodingFunction = new EncodingFunction(this.stompEncoder);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
|
||||
return new DecodingFunction(this.stompDecoder, messageConsumer);
|
||||
|
@ -66,14 +64,15 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
|
|||
|
||||
@Override
|
||||
public Buffer apply(Message<byte[]> message) {
|
||||
return encodingFunction.apply(message);
|
||||
return this.encodingFunction.apply(message);
|
||||
}
|
||||
|
||||
|
||||
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
|
||||
|
||||
private final StompEncoder encoder;
|
||||
|
||||
private EncodingFunction(StompEncoder encoder) {
|
||||
public EncodingFunction(StompEncoder encoder) {
|
||||
this.encoder = encoder;
|
||||
}
|
||||
|
||||
|
@ -84,6 +83,7 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
|
||||
|
||||
private final StompDecoder decoder;
|
||||
|
@ -103,4 +103,5 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -16,7 +16,7 @@
|
|||
|
||||
package org.springframework.messaging.simp.stomp;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
|
@ -114,7 +114,7 @@ public class Reactor2TcpStompClient extends StompClientSupport {
|
|||
String dispatcherName = "StompClient";
|
||||
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
|
||||
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
|
||||
List<DispatcherConfiguration> configList = Arrays.<DispatcherConfiguration>asList(config);
|
||||
List<DispatcherConfiguration> configList = Collections.<DispatcherConfiguration>singletonList(config);
|
||||
return new ReactorConfiguration(configList, dispatcherName, new Properties());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -151,15 +151,13 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
|
|||
try {
|
||||
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
|
||||
}
|
||||
catch (Exception ex) {
|
||||
catch (Throwable ex) {
|
||||
ioThreadCount = -1;
|
||||
}
|
||||
if (ioThreadCount <= 0l) {
|
||||
if (ioThreadCount <= 0) {
|
||||
ioThreadCount = Runtime.getRuntime().availableProcessors();
|
||||
}
|
||||
|
||||
return new NioEventLoopGroup(ioThreadCount,
|
||||
new NamedDaemonThreadFactory("reactor-tcp-io"));
|
||||
return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -29,10 +29,9 @@ import org.springframework.util.concurrent.ListenableFuture;
|
|||
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection
|
||||
* TcpConnection} based on the TCP client support of the Reactor project.
|
||||
*
|
||||
* @param <P> the payload type of messages read or written to the TCP stream.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.2
|
||||
* @param <P> the payload type of messages read or written to the TCP stream.
|
||||
*/
|
||||
public class Reactor2TcpConnection<P> implements TcpConnection<P> {
|
||||
|
||||
|
@ -41,9 +40,7 @@ public class Reactor2TcpConnection<P> implements TcpConnection<P> {
|
|||
private final Promise<Void> closePromise;
|
||||
|
||||
|
||||
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream,
|
||||
Promise<Void> closePromise) {
|
||||
|
||||
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, Promise<Void> closePromise) {
|
||||
this.channelStream = channelStream;
|
||||
this.closePromise = closePromise;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue