Polishing

(cherry picked from commit 404e7cd)
This commit is contained in:
Juergen Hoeller 2016-09-13 23:31:09 +02:00
parent fbe7ddb640
commit 54db496815
5 changed files with 27 additions and 31 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -145,7 +145,7 @@ public class PropertyValue extends BeanMetadataAttributeAccessor implements Seri
}
/**
* Reeurn whether this is an optional value, that is, to be ignored
* Return whether this is an optional value, that is, to be ignored
* when no corresponding property exists on the target class.
* @since 3.0
*/

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,14 +18,14 @@ package org.springframework.messaging.simp.stomp;
import java.nio.ByteBuffer;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages.
*
@ -35,25 +35,23 @@ import reactor.io.codec.Codec;
*/
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
private final StompDecoder stompDecoder;
private final StompEncoder stompEncoder;
private final Function<Message<byte[]>, Buffer> encodingFunction;
private final StompDecoder stompDecoder;
public Reactor2StompCodec() {
this(new StompEncoder(), new StompDecoder());
}
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) {
Assert.notNull(encoder, "'encoder' is required");
Assert.notNull(decoder, "'decoder' is required");
this.stompEncoder = encoder;
Assert.notNull(encoder, "StompEncoder is required");
Assert.notNull(decoder, "StompDecoder is required");
this.encodingFunction = new EncodingFunction(encoder);
this.stompDecoder = decoder;
this.encodingFunction = new EncodingFunction(this.stompEncoder);
}
@Override
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
return new DecodingFunction(this.stompDecoder, messageConsumer);
@ -66,14 +64,15 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
@Override
public Buffer apply(Message<byte[]> message) {
return encodingFunction.apply(message);
return this.encodingFunction.apply(message);
}
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
private final StompEncoder encoder;
private EncodingFunction(StompEncoder encoder) {
public EncodingFunction(StompEncoder encoder) {
this.encoder = encoder;
}
@ -84,6 +83,7 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
}
}
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
private final StompDecoder decoder;
@ -103,4 +103,5 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
return null;
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,7 @@
package org.springframework.messaging.simp.stomp;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@ -114,7 +114,7 @@ public class Reactor2TcpStompClient extends StompClientSupport {
String dispatcherName = "StompClient";
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
List<DispatcherConfiguration> configList = Arrays.<DispatcherConfiguration>asList(config);
List<DispatcherConfiguration> configList = Collections.<DispatcherConfiguration>singletonList(config);
return new ReactorConfiguration(configList, dispatcherName, new Properties());
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -151,15 +151,13 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
}
catch (Exception ex) {
catch (Throwable ex) {
ioThreadCount = -1;
}
if (ioThreadCount <= 0l) {
if (ioThreadCount <= 0) {
ioThreadCount = Runtime.getRuntime().availableProcessors();
}
return new NioEventLoopGroup(ioThreadCount,
new NamedDaemonThreadFactory("reactor-tcp-io"));
return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,10 +29,9 @@ import org.springframework.util.concurrent.ListenableFuture;
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection
* TcpConnection} based on the TCP client support of the Reactor project.
*
* @param <P> the payload type of messages read or written to the TCP stream.
*
* @author Rossen Stoyanchev
* @since 4.2
* @param <P> the payload type of messages read or written to the TCP stream.
*/
public class Reactor2TcpConnection<P> implements TcpConnection<P> {
@ -41,9 +40,7 @@ public class Reactor2TcpConnection<P> implements TcpConnection<P> {
private final Promise<Void> closePromise;
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream,
Promise<Void> closePromise) {
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, Promise<Void> closePromise) {
this.channelStream = channelStream;
this.closePromise = closePromise;
}