Netty support for (Async)RestTemplate

This commit introduces an AsyncClientHttpRequestFactory based on Netty
4, for use with the (Async)RestTemplate.
This commit is contained in:
Arjen Poutsma 2014-10-16 11:46:22 +02:00 committed by Rossen Stoyanchev
parent a13bb69cbe
commit 7de0a70f0c
9 changed files with 608 additions and 21 deletions

View File

@ -672,6 +672,7 @@ project("spring-web") {
optional("commons-fileupload:commons-fileupload:1.3.1")
optional("org.apache.httpcomponents:httpclient:4.3.5")
optional("org.apache.httpcomponents:httpasyncclient:4.0.2")
optional("io.netty:netty-all:4.0.23.Final")
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-xml:${jackson2Version}")
optional("com.google.code.gson:gson:${gsonVersion}")

View File

@ -0,0 +1,186 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpVersion;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
/**
* {@link org.springframework.http.client.ClientHttpRequest} implementation that uses
* Netty 4 to execute requests.
*
* <p>Created via the {@link Netty4ClientHttpRequestFactory}.
*
* @author Arjen Poutsma
* @since 4.2
*/
class Netty4ClientHttpRequest extends AbstractAsyncClientHttpRequest implements ClientHttpRequest {
private final Bootstrap bootstrap;
private final URI uri;
private final HttpMethod method;
private final ByteBufOutputStream body;
Netty4ClientHttpRequest(Bootstrap bootstrap, URI uri, HttpMethod method, int maxRequestSize) {
this.bootstrap = bootstrap;
this.uri = uri;
this.method = method;
this.body = new ByteBufOutputStream(Unpooled.buffer(maxRequestSize));
}
@Override
public HttpMethod getMethod() {
return this.method;
}
@Override
public URI getURI() {
return this.uri;
}
@Override
protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
return body;
}
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers)
throws IOException {
final SettableListenableFuture<ClientHttpResponse> responseFuture =
new SettableListenableFuture<ClientHttpResponse>();
ChannelFutureListener connectionListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline()
.addLast(new SimpleChannelInboundHandler<FullHttpResponse>() {
@Override
protected void channelRead0(
ChannelHandlerContext ctx,
FullHttpResponse msg) throws Exception {
responseFuture
.set(new Netty4ClientHttpResponse(ctx,
msg));
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx,
Throwable cause) throws Exception {
responseFuture.setException(cause);
}
});
FullHttpRequest nettyRequest =
createFullHttpRequest(headers);
channel.writeAndFlush(nettyRequest);
}
else {
responseFuture.setException(future.cause());
}
}
};
bootstrap.connect(uri.getHost(), getPort(uri)).addListener(connectionListener);
return responseFuture;
}
@Override
public ClientHttpResponse execute() throws IOException {
try {
return executeAsync().get();
}
catch (InterruptedException ex) {
throw new IOException(ex.getMessage(), ex);
}
catch (ExecutionException ex) {
if (ex.getCause() instanceof IOException) {
throw (IOException) ex.getCause();
} else {
throw new IOException(ex.getMessage(), ex);
}
}
}
private static int getPort(URI uri) {
int port = uri.getPort();
if (port == -1) {
if ("http".equalsIgnoreCase(uri.getScheme())) {
port = 80;
}
else if ("https".equalsIgnoreCase(uri.getScheme())) {
port = 443;
}
}
return port;
}
private FullHttpRequest createFullHttpRequest(HttpHeaders headers) {
io.netty.handler.codec.http.HttpMethod nettyMethod =
io.netty.handler.codec.http.HttpMethod.valueOf(method.name());
FullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
nettyMethod, this.uri.getRawPath(),
this.body.buffer());
nettyRequest.headers()
.set(io.netty.handler.codec.http.HttpHeaders.Names.HOST, uri.getHost());
nettyRequest.headers()
.set(io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION,
io.netty.handler.codec.http.HttpHeaders.Values.CLOSE);
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
nettyRequest.headers().add(entry.getKey(), entry.getValue());
}
return nettyRequest;
}
}

View File

@ -0,0 +1,159 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import java.io.IOException;
import java.net.URI;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.SslContext;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* {@link org.springframework.http.client.ClientHttpRequestFactory} implementation that
* uses <a href="http://netty.io/">Netty 4</a> to create requests.
*
* <p>Allows to use a pre-configured {@link EventLoopGroup} instance - useful for sharing
* across multiple clients.
*
* @author Arjen Poutsma
* @since 4.2
*/
public class Netty4ClientHttpRequestFactory
implements ClientHttpRequestFactory, AsyncClientHttpRequestFactory,
InitializingBean, DisposableBean {
/**
* The default maximum request size.
* @see #setMaxRequestSize(int)
*/
public static final int DEFAULT_MAX_REQUEST_SIZE = 1024 * 1024 * 10;
private final EventLoopGroup eventLoopGroup;
private final boolean defaultEventLoopGroup;
private SslContext sslContext;
private int maxRequestSize = DEFAULT_MAX_REQUEST_SIZE;
private Bootstrap bootstrap;
/**
* Creates a new {@code Netty4ClientHttpRequestFactory} with a default
* {@link NioEventLoopGroup}.
*/
public Netty4ClientHttpRequestFactory() {
int ioWorkerCount = Runtime.getRuntime().availableProcessors() * 2;
eventLoopGroup = new NioEventLoopGroup(ioWorkerCount);
defaultEventLoopGroup = true;
}
/**
* Creates a new {@code Netty4ClientHttpRequestFactory} with the given
* {@link EventLoopGroup}.
*
* <p><b>NOTE:</b> the given group will <strong>not</strong> be
* {@linkplain EventLoopGroup#shutdownGracefully() shutdown} by this factory; doing
* so becomes the responsibility of the caller.
*/
public Netty4ClientHttpRequestFactory(EventLoopGroup eventLoopGroup) {
Assert.notNull(eventLoopGroup, "'eventLoopGroup' must not be null");
this.eventLoopGroup = eventLoopGroup;
this.defaultEventLoopGroup = false;
}
/**
* Sets the default maximum request size. The default is
* {@link #DEFAULT_MAX_REQUEST_SIZE}.
* @see HttpObjectAggregator#HttpObjectAggregator(int)
*/
public void setMaxRequestSize(int maxRequestSize) {
this.maxRequestSize = maxRequestSize;
}
/**
* Sets the SSL context.
*/
public void setSslContext(SslContext sslContext) {
this.sslContext = sslContext;
}
private Bootstrap getBootstrap() {
if (this.bootstrap == null) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslContext != null) {
pipeline.addLast(sslContext.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(maxRequestSize));
}
});
this.bootstrap = bootstrap;
}
return this.bootstrap;
}
@Override
public void afterPropertiesSet() throws Exception {
getBootstrap();
}
private Netty4ClientHttpRequest createRequestInternal(URI uri, HttpMethod httpMethod) {
return new Netty4ClientHttpRequest(getBootstrap(), uri, httpMethod, maxRequestSize);
}
@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod)
throws IOException {
return createRequestInternal(uri, httpMethod);
}
@Override
public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod)
throws IOException {
return createRequestInternal(uri, httpMethod);
}
@Override
public void destroy() throws InterruptedException {
if (defaultEventLoopGroup) {
// clean up the EventLoopGroup if we created it in the constructor
eventLoopGroup.shutdownGracefully().sync();
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpResponse;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* {@link org.springframework.http.client.ClientHttpResponse} implementation that uses
* Netty 4 to execute requests.
*
* @author Arjen Poutsma
* @since 4.2
*/
class Netty4ClientHttpResponse extends AbstractClientHttpResponse {
private final ChannelHandlerContext context;
private final FullHttpResponse nettyResponse;
private final ByteBufInputStream body;
private HttpHeaders headers;
Netty4ClientHttpResponse(ChannelHandlerContext context,
FullHttpResponse nettyResponse) {
Assert.notNull(context, "'context' must not be null");
Assert.notNull(nettyResponse, "'nettyResponse' must not be null");
this.context = context;
this.nettyResponse = nettyResponse;
this.body = new ByteBufInputStream(this.nettyResponse.content());
this.nettyResponse.retain();
}
@Override
public int getRawStatusCode() throws IOException {
return this.nettyResponse.getStatus().code();
}
@Override
public String getStatusText() throws IOException {
return this.nettyResponse.getStatus().reasonPhrase();
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
HttpHeaders headers = new HttpHeaders();
for (Map.Entry<String, String> entry : this.nettyResponse.headers()) {
headers.add(entry.getKey(), entry.getValue());
}
this.headers = headers;
}
return this.headers;
}
@Override
public InputStream getBody() throws IOException {
return this.body;
}
@Override
public void close() {
this.nettyResponse.release();
this.context.close();
}
}

View File

@ -22,9 +22,12 @@ import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.Future;
import org.junit.After;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
@ -34,8 +37,6 @@ import org.springframework.util.StreamUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import static org.junit.Assert.*;
public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJettyServerTestCase {
protected AsyncClientHttpRequestFactory factory;
@ -49,6 +50,13 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJe
}
}
@After
public final void destroyFactory() throws Exception {
if (factory instanceof DisposableBean) {
((DisposableBean) factory).destroy();
}
}
protected abstract AsyncClientHttpRequestFactory createRequestFactory();
@ -60,7 +68,11 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJe
assertEquals("Invalid HTTP URI", uri, request.getURI());
Future<ClientHttpResponse> futureResponse = request.executeAsync();
ClientHttpResponse response = futureResponse.get();
assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode());
try {
assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode());
} finally {
response.close();
}
}
@Test
@ -74,24 +86,24 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJe
@Override
public void onSuccess(ClientHttpResponse result) {
try {
System.out.println("SUCCESS! " + result.getStatusCode());
System.out.println("Callback: " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getId());
assertEquals("Invalid status code", HttpStatus.NOT_FOUND, result.getStatusCode());
}
catch (IOException ex) {
ex.printStackTrace();
fail(ex.getMessage());
}
}
@Override
public void onFailure(Throwable ex) {
System.out.println("FAILURE: " + ex);
fail(ex.getMessage());
}
});
ClientHttpResponse response = listenableFuture.get();
System.out.println("Main thread: " + System.currentTimeMillis());
assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode());
System.out.println(Thread.currentThread().getId());
try {
assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode());
}
finally {
response.close();
}
}
@Test
@ -129,7 +141,7 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJe
}
}
@Test(expected = IllegalStateException.class)
@Test
public void multipleWrites() throws Exception {
AsyncClientHttpRequest request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), HttpMethod.POST);
final byte[] body = "Hello World".getBytes("UTF-8");
@ -146,13 +158,17 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJe
ClientHttpResponse response = futureResponse.get();
try {
FileCopyUtils.copy(body, request.getBody());
fail("IllegalStateException expected");
}
catch (IllegalStateException ex) {
// expected
}
finally {
response.close();
}
}
@Test(expected = UnsupportedOperationException.class)
@Test
public void headersAfterExecute() throws Exception {
AsyncClientHttpRequest request = factory.createAsyncRequest(new URI(baseUrl + "/echo"), HttpMethod.POST);
request.getHeaders().add("MyHeader", "value");
@ -163,6 +179,10 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJe
ClientHttpResponse response = futureResponse.get();
try {
request.getHeaders().add("MyHeader", "value");
fail("UnsupportedOperationException expected");
}
catch (UnsupportedOperationException ex) {
// expected
}
finally {
response.close();

View File

@ -22,17 +22,20 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Locale;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.StreamingHttpOutputMessage;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.StreamUtils;
import static org.junit.Assert.*;
/** @author Arjen Poutsma */
public abstract class AbstractHttpRequestFactoryTestCase extends
AbstractJettyServerTestCase {
@ -40,10 +43,21 @@ public abstract class AbstractHttpRequestFactoryTestCase extends
protected ClientHttpRequestFactory factory;
@Before
public final void createFactory() {
public final void createFactory() throws Exception {
factory = createRequestFactory();
if (factory instanceof InitializingBean) {
((InitializingBean) factory).afterPropertiesSet();
}
}
@After
public final void destroyFactory() throws Exception {
if (factory instanceof DisposableBean) {
((DisposableBean) factory).destroy();
}
}
protected abstract ClientHttpRequestFactory createRequestFactory();
@Test
@ -53,7 +67,11 @@ public abstract class AbstractHttpRequestFactoryTestCase extends
assertEquals("Invalid HTTP method", HttpMethod.GET, request.getMethod());
assertEquals("Invalid HTTP URI", uri, request.getURI());
ClientHttpResponse response = request.execute();
assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode());
try {
assertEquals("Invalid status code", HttpStatus.NOT_FOUND, response.getStatusCode());
} finally {
response.close();
}
}
@Test

View File

@ -31,12 +31,11 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.AfterClass;
import static org.junit.Assert.assertEquals;
import org.junit.BeforeClass;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.SocketUtils;
import static org.junit.Assert.*;
import org.springframework.util.StreamUtils;
/** @author Arjen Poutsma */
public class AbstractJettyServerTestCase {
@ -147,6 +146,8 @@ public class AbstractJettyServerTestCase {
private void echo(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.setStatus(HttpServletResponse.SC_OK);
response.setContentType(request.getContentType());
response.setContentLength(request.getContentLength());
for (Enumeration<String> e1 = request.getHeaderNames(); e1.hasMoreElements();) {
String headerName = e1.nextElement();
for (Enumeration<String> e2 = request.getHeaders(headerName); e2.hasMoreElements();) {
@ -154,7 +155,7 @@ public class AbstractJettyServerTestCase {
response.addHeader(headerName, headerValue);
}
}
FileCopyUtils.copy(request.getInputStream(), response.getOutputStream());
StreamUtils.copy(request.getInputStream(), response.getOutputStream());
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.http.HttpMethod;
/**
* @author Arjen Poutsma
*/
public class Netty4AsyncClientHttpRequestFactoryTests
extends AbstractAsyncHttpRequestFactoryTestCase {
private static EventLoopGroup eventLoopGroup;
@BeforeClass
public static void createEventLoopGroup() {
eventLoopGroup = new NioEventLoopGroup();
}
@AfterClass
public static void shutdownEventLoopGroup() throws InterruptedException {
eventLoopGroup.shutdownGracefully().sync();
}
@Override
protected AsyncClientHttpRequestFactory createRequestFactory() {
return new Netty4ClientHttpRequestFactory(eventLoopGroup);
}
@Override
@Test
public void httpMethods() throws Exception {
assertHttpMethod("patch", HttpMethod.PATCH);
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.http.HttpMethod;
/**
* @author Arjen Poutsma
*/
public class Netty4ClientHttpRequestFactoryTests
extends AbstractHttpRequestFactoryTestCase {
private static EventLoopGroup eventLoopGroup;
@BeforeClass
public static void createEventLoopGroup() {
eventLoopGroup = new NioEventLoopGroup();
}
@AfterClass
public static void shutdownEventLoopGroup() throws InterruptedException {
eventLoopGroup.shutdownGracefully().sync();
}
@Override
protected ClientHttpRequestFactory createRequestFactory() {
return new Netty4ClientHttpRequestFactory(eventLoopGroup);
}
@Override
@Test
public void httpMethods() throws Exception {
assertHttpMethod("patch", HttpMethod.PATCH);
}
}