diff --git a/spring-boot-project/spring-boot-autoconfigure/build.gradle b/spring-boot-project/spring-boot-autoconfigure/build.gradle index 684f7717688..59be6a360a1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/build.gradle +++ b/spring-boot-project/spring-boot-autoconfigure/build.gradle @@ -104,6 +104,7 @@ dependencies { optional("org.springframework.integration:spring-integration-core") optional("org.springframework.integration:spring-integration-jdbc") optional("org.springframework.integration:spring-integration-jmx") + optional("org.springframework.integration:spring-integration-rsocket") optional("org.springframework:spring-jms") optional("org.springframework:spring-orm") optional("org.springframework:spring-tx") diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java index 7b9b31ace53..a493add9d28 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,13 @@ package org.springframework.boot.autoconfigure.integration; import javax.management.MBeanServer; import javax.sql.DataSource; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.netty.server.TcpServerTransport; + import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -29,8 +34,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandi import org.springframework.boot.autoconfigure.condition.SearchStrategy; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration; +import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.env.Environment; @@ -42,6 +49,14 @@ import org.springframework.integration.gateway.GatewayProxyFactoryBean; import org.springframework.integration.jdbc.store.JdbcMessageStore; import org.springframework.integration.jmx.config.EnableIntegrationMBeanExport; import org.springframework.integration.monitor.IntegrationMBeanExporter; +import org.springframework.integration.rsocket.ClientRSocketConnector; +import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; +import org.springframework.integration.rsocket.ServerRSocketConnector; +import org.springframework.integration.rsocket.ServerRSocketMessageHandler; +import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.util.StringUtils; /** @@ -138,4 +153,100 @@ public class IntegrationAutoConfiguration { } + /** + * Integration RSocket configuration. + */ + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass({ IntegrationRSocketEndpoint.class, RSocketRequester.class, RSocketFactory.class }) + @Conditional(IntegrationRSocketConfiguration.AnyRSocketChannelAdapterAvailable.class) + protected static class IntegrationRSocketConfiguration { + + /** + * Check if either a {@link IntegrationRSocketEndpoint} or + * {@link RSocketOutboundGateway} bean is available. + */ + static class AnyRSocketChannelAdapterAvailable extends AnyNestedCondition { + + AnyRSocketChannelAdapterAvailable() { + super(ConfigurationPhase.REGISTER_BEAN); + } + + @ConditionalOnBean(IntegrationRSocketEndpoint.class) + static class IntegrationRSocketEndpointAvailable { + + } + + @ConditionalOnBean(RSocketOutboundGateway.class) + static class RSocketOutboundGatewayAvailable { + + } + + } + + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(TcpServerTransport.class) + @AutoConfigureBefore(RSocketMessagingAutoConfiguration.class) + protected static class IntegrationRSocketServerConfiguration { + + @Bean + @ConditionalOnMissingBean(ServerRSocketMessageHandler.class) + public RSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies, + IntegrationProperties integrationProperties) { + + RSocketMessageHandler messageHandler = new ServerRSocketMessageHandler( + integrationProperties.getRSocket().getServer().isMessageMappingEnabled()); + messageHandler.setRSocketStrategies(rSocketStrategies); + return messageHandler; + } + + @Bean + @ConditionalOnMissingBean + public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler messageHandler) { + return new ServerRSocketConnector(messageHandler); + } + + } + + @Configuration(proxyBeanMethods = false) + protected static class IntegrationRSocketClientConfiguration { + + @Bean + @ConditionalOnMissingBean + @Conditional(RemoteRSocketServerAddressConfigured.class) + public ClientRSocketConnector clientRSocketConnector(IntegrationProperties integrationProperties, + RSocketStrategies rSocketStrategies) { + + IntegrationProperties.RSocket.Client client = integrationProperties.getRSocket().getClient(); + ClientRSocketConnector clientRSocketConnector = (client.getUri() != null) + ? new ClientRSocketConnector(client.getUri()) + : new ClientRSocketConnector(client.getHost(), client.getPort()); + clientRSocketConnector.setRSocketStrategies(rSocketStrategies); + return clientRSocketConnector; + } + + /** + * Check if a remote address is configured for the RSocket Integration client. + */ + static class RemoteRSocketServerAddressConfigured extends AnyNestedCondition { + + RemoteRSocketServerAddressConfigured() { + super(ConfigurationPhase.REGISTER_BEAN); + } + + @ConditionalOnProperty(prefix = "spring.integration.rsocket.client", name = "uri") + static class WebSocketAddressConfigured { + + } + + @ConditionalOnProperty(prefix = "spring.integration.rsocket.client", name = { "host", "port" }) + static class TcpAddressConfigured { + + } + + } + + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java index b27c1d9c1b3..a936e926344 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java @@ -16,6 +16,8 @@ package org.springframework.boot.autoconfigure.integration; +import java.net.URI; + import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceInitializationMode; @@ -24,6 +26,7 @@ import org.springframework.boot.jdbc.DataSourceInitializationMode; * * @author Vedran Pavic * @author Stephane Nicoll + * @author Artem Bilan * @since 2.0.0 */ @ConfigurationProperties(prefix = "spring.integration") @@ -31,10 +34,16 @@ public class IntegrationProperties { private final Jdbc jdbc = new Jdbc(); + private final RSocket rsocket = new RSocket(); + public Jdbc getJdbc() { return this.jdbc; } + public RSocket getRSocket() { + return this.rsocket; + } + public static class Jdbc { private static final String DEFAULT_SCHEMA_LOCATION = "classpath:org/springframework/" @@ -68,4 +77,80 @@ public class IntegrationProperties { } + public static class RSocket { + + private final Client client = new Client(); + + private final Server server = new Server(); + + public Client getClient() { + return this.client; + } + + public Server getServer() { + return this.server; + } + + public static class Client { + + /** + * TCP RSocket server host to connect to. + */ + private String host; + + /** + * TCP RSocket server port to connect to. + */ + private Integer port; + + /** + * WebSocket RSocket server uri to connect to. + */ + private URI uri; + + public void setHost(String host) { + this.host = host; + } + + public String getHost() { + return this.host; + } + + public void setPort(Integer port) { + this.port = port; + } + + public Integer getPort() { + return this.port; + } + + public void setUri(URI uri) { + this.uri = uri; + } + + public URI getUri() { + return this.uri; + } + + } + + public static class Server { + + /** + * Whether to handle message mapping for RSocket via Spring Integration. + */ + boolean messageMappingEnabled; + + public boolean isMessageMappingEnabled() { + return this.messageMappingEnabled; + } + + public void setMessageMappingEnabled(boolean messageMappingEnabled) { + this.messageMappingEnabled = messageMappingEnabled; + } + + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java index b006eb048fc..a6932d7205c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,14 +18,22 @@ package org.springframework.boot.autoconfigure.integration; import javax.management.MBeanServer; +import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.netty.client.TcpClientTransport; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration.IntegrationComponentScanConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration; import org.springframework.boot.autoconfigure.jdbc.JdbcTemplateAutoConfiguration; import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration; +import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration; +import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfiguration; +import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration; +import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration; import org.springframework.boot.jdbc.DataSourceInitializationMode; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; @@ -38,10 +46,16 @@ import org.springframework.integration.core.MessageSource; import org.springframework.integration.endpoint.MessageProcessorMessageSource; import org.springframework.integration.gateway.RequestReplyExchanger; import org.springframework.integration.handler.MessageProcessor; +import org.springframework.integration.rsocket.ClientRSocketConnector; +import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; +import org.springframework.integration.rsocket.ServerRSocketConnector; +import org.springframework.integration.rsocket.ServerRSocketMessageHandler; import org.springframework.integration.support.channel.HeaderChannelRegistry; import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jmx.export.MBeanExporter; +import org.springframework.messaging.Message; +import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -188,6 +202,33 @@ class IntegrationAutoConfigurationTests { }); } + @Test + void rsocketSupportEnabled() { + this.contextRunner.withUserConfiguration(RSocketServerConfiguration.class) + .withConfiguration(AutoConfigurations.of(RSocketServerAutoConfiguration.class, + RSocketStrategiesAutoConfiguration.class, RSocketMessagingAutoConfiguration.class, + RSocketRequesterAutoConfiguration.class, IntegrationAutoConfiguration.class)) + .withPropertyValues("spring.rsocket.server.port=0", "spring.integration.rsocket.client.port=0", + "spring.integration.rsocket.client.host=localhost", + "spring.integration.rsocket.server.message-mapping-enabled=true") + .run((context) -> { + assertThat(context).hasSingleBean(ClientRSocketConnector.class).hasBean("clientRSocketConnector") + .hasSingleBean(ServerRSocketConnector.class) + .hasSingleBean(ServerRSocketMessageHandler.class) + .hasSingleBean(RSocketMessageHandler.class); + + ServerRSocketMessageHandler serverRSocketMessageHandler = context + .getBean(ServerRSocketMessageHandler.class); + assertThat(context).getBean(RSocketMessageHandler.class).isSameAs(serverRSocketMessageHandler); + + ClientRSocketConnector clientRSocketConnector = context.getBean(ClientRSocketConnector.class); + ClientTransport clientTransport = (ClientTransport) new DirectFieldAccessor(clientRSocketConnector) + .getPropertyValue("clientTransport"); + + assertThat(clientTransport).isInstanceOf(TcpClientTransport.class); + }); + } + @Configuration(proxyBeanMethods = false) static class CustomMBeanExporter { @@ -220,4 +261,26 @@ class IntegrationAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class RSocketServerConfiguration { + + @Bean + IntegrationRSocketEndpoint mockIntegrationRSocketEndpoint() { + return new IntegrationRSocketEndpoint() { + + @Override + public Mono handleMessage(Message message) { + return null; + } + + @Override + public String[] getPath() { + return new String[] { "/rsocketTestPath" }; + } + + }; + } + + } + } diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc index e27c712a178..1f5191db4bd 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc @@ -5932,6 +5932,21 @@ If `spring-integration-jdbc` is available, the default database schema can be cr spring.integration.jdbc.initialize-schema=always ---- +If `spring-integration-rsocket` is available, developers can configure an RSocket server using `"spring.rsocket.server.*"` properties and let it use `IntegrationRSocketEndpoint` or `RSocketOutboundGateway` components to handle incoming RSocket messages. +This infrastructure can handle Spring Integration RSocket channel adapters and `@MessageMapping` handlers (given `"spring.integration.rsocket.server.message-mapping-enabled"` is configured). + +Spring Boot can also auto-configure an `ClientRSocketConnector` using configuration properties: + +[source,properties,indent=0,configprops] +---- + # Connecting to a RSocket server over TCP + spring.integration.rsocket.client.host=example.org + spring.integration.rsocket.client.port=9898 + + # Connecting to a RSocket Server over WebSocket + spring.integration.rsocket.client.uri=ws://example.org +---- + See the {spring-boot-autoconfigure-module-code}/integration/IntegrationAutoConfiguration.java[`IntegrationAutoConfiguration`] and {spring-boot-autoconfigure-module-code}/integration/IntegrationProperties.java[`IntegrationProperties`] classes for more details. By default, if a Micrometer `meterRegistry` bean is present, Spring Integration metrics will be managed by Micrometer.