Polishing

(cherry picked from commit 404e7cd)
This commit is contained in:
Juergen Hoeller 2016-09-13 23:31:09 +02:00
parent fbe7ddb640
commit 54db496815
5 changed files with 27 additions and 31 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -145,7 +145,7 @@ public class PropertyValue extends BeanMetadataAttributeAccessor implements Seri
} }
/** /**
* Reeurn whether this is an optional value, that is, to be ignored * Return whether this is an optional value, that is, to be ignored
* when no corresponding property exists on the target class. * when no corresponding property exists on the target class.
* @since 3.0 * @since 3.0
*/ */

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,14 +18,14 @@ package org.springframework.messaging.simp.stomp;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.fn.Consumer; import reactor.fn.Consumer;
import reactor.fn.Function; import reactor.fn.Function;
import reactor.io.buffer.Buffer; import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec; import reactor.io.codec.Codec;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/** /**
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages. * A Reactor TCP {@link Codec} for sending and receiving STOMP messages.
* *
@ -35,25 +35,23 @@ import reactor.io.codec.Codec;
*/ */
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> { public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
private final StompDecoder stompDecoder;
private final StompEncoder stompEncoder;
private final Function<Message<byte[]>, Buffer> encodingFunction; private final Function<Message<byte[]>, Buffer> encodingFunction;
private final StompDecoder stompDecoder;
public Reactor2StompCodec() { public Reactor2StompCodec() {
this(new StompEncoder(), new StompDecoder()); this(new StompEncoder(), new StompDecoder());
} }
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) { public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) {
Assert.notNull(encoder, "'encoder' is required"); Assert.notNull(encoder, "StompEncoder is required");
Assert.notNull(decoder, "'decoder' is required"); Assert.notNull(decoder, "StompDecoder is required");
this.stompEncoder = encoder; this.encodingFunction = new EncodingFunction(encoder);
this.stompDecoder = decoder; this.stompDecoder = decoder;
this.encodingFunction = new EncodingFunction(this.stompEncoder);
} }
@Override @Override
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) { public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
return new DecodingFunction(this.stompDecoder, messageConsumer); return new DecodingFunction(this.stompDecoder, messageConsumer);
@ -66,14 +64,15 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
@Override @Override
public Buffer apply(Message<byte[]> message) { public Buffer apply(Message<byte[]> message) {
return encodingFunction.apply(message); return this.encodingFunction.apply(message);
} }
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> { private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
private final StompEncoder encoder; private final StompEncoder encoder;
private EncodingFunction(StompEncoder encoder) { public EncodingFunction(StompEncoder encoder) {
this.encoder = encoder; this.encoder = encoder;
} }
@ -84,6 +83,7 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
} }
} }
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> { private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
private final StompDecoder decoder; private final StompDecoder decoder;
@ -103,4 +103,5 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
return null; return null;
} }
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,7 +16,7 @@
package org.springframework.messaging.simp.stomp; package org.springframework.messaging.simp.stomp;
import java.util.Arrays; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
@ -114,7 +114,7 @@ public class Reactor2TcpStompClient extends StompClientSupport {
String dispatcherName = "StompClient"; String dispatcherName = "StompClient";
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP; DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0); DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
List<DispatcherConfiguration> configList = Arrays.<DispatcherConfiguration>asList(config); List<DispatcherConfiguration> configList = Collections.<DispatcherConfiguration>singletonList(config);
return new ReactorConfiguration(configList, dispatcherName, new Properties()); return new ReactorConfiguration(configList, dispatcherName, new Properties());
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -151,15 +151,13 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
try { try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
} }
catch (Exception ex) { catch (Throwable ex) {
ioThreadCount = -1; ioThreadCount = -1;
} }
if (ioThreadCount <= 0l) { if (ioThreadCount <= 0) {
ioThreadCount = Runtime.getRuntime().availableProcessors(); ioThreadCount = Runtime.getRuntime().availableProcessors();
} }
return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
return new NioEventLoopGroup(ioThreadCount,
new NamedDaemonThreadFactory("reactor-tcp-io"));
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -29,10 +29,9 @@ import org.springframework.util.concurrent.ListenableFuture;
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection * An implementation of {@link org.springframework.messaging.tcp.TcpConnection
* TcpConnection} based on the TCP client support of the Reactor project. * TcpConnection} based on the TCP client support of the Reactor project.
* *
* @param <P> the payload type of messages read or written to the TCP stream.
*
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.2 * @since 4.2
* @param <P> the payload type of messages read or written to the TCP stream.
*/ */
public class Reactor2TcpConnection<P> implements TcpConnection<P> { public class Reactor2TcpConnection<P> implements TcpConnection<P> {
@ -41,9 +40,7 @@ public class Reactor2TcpConnection<P> implements TcpConnection<P> {
private final Promise<Void> closePromise; private final Promise<Void> closePromise;
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, Promise<Void> closePromise) {
Promise<Void> closePromise) {
this.channelStream = channelStream; this.channelStream = channelStream;
this.closePromise = closePromise; this.closePromise = closePromise;
} }