Add client Request/Response impl. for RxNetty

This commit adds the `ClientHttpRequest` and `ClientHttpResponse`
implementations for the RxNetty HTTP client.
This client library is based on the `Single` and `Observable`
composition API, so this has to be converted to the `Flux`/`Mono`
variants.
This commit is contained in:
Brian Clozel 2016-02-01 18:44:37 +01:00
parent 255d2de553
commit f63960af0a
3 changed files with 238 additions and 0 deletions

View File

@ -0,0 +1,124 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
/**
* {@link ClientHttpRequest} implementation for the RxNetty HTTP client
*
* @author Brian Clozel
*/
public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
private final NettyDataBufferAllocator allocator;
private final HttpMethod httpMethod;
private final URI uri;
private Observable<ByteBuf> body;
public RxNettyClientHttpRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers, NettyDataBufferAllocator allocator) {
super(headers);
this.httpMethod = httpMethod;
this.uri = uri;
this.allocator = allocator;
}
/**
* Set the body of the message to the given {@link Publisher}.
*
* <p>Since the HTTP channel is not yet created when this method
* is called, the {@code Mono<Void>} return value completes immediately.
* For an event that signals that we're done writing the request, check the
* {@link #execute()} method.
*
* @return a publisher that completes immediately.
* @see #execute()
*/
@Override
public Mono<Void> setBody(Publisher<DataBuffer> body) {
this.body = RxJava1ObservableConverter.from(Flux.from(body)
.map(b -> allocator.wrap(b.asByteBuffer()).getNativeBuffer()));
return Mono.empty();
}
@Override
public HttpMethod getMethod() {
return this.httpMethod;
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public Mono<ClientHttpResponse> execute() {
try {
HttpClientRequest<ByteBuf, ByteBuf> request = HttpClient
.newClient(this.uri.getHost(), this.uri.getPort())
.createRequest(io.netty.handler.codec.http.HttpMethod.valueOf(this.httpMethod.name()), uri.getRawPath());
return applyBeforeCommit()
.after(() -> Mono.just(request))
.map(req -> {
for (Map.Entry<String, List<String>> entry : getHeaders().entrySet()) {
for (String value : entry.getValue()) {
req = req.addHeader(entry.getKey(), value);
}
}
return req;
})
.map(req -> {
if (this.body != null) {
return RxJava1ObservableConverter.from(req.writeContent(this.body));
}
else {
return RxJava1ObservableConverter.from(req);
}
})
.flatMap(resp -> resp)
.next()
.map(response -> new RxNettyClientHttpResponse(response, this.allocator));
}
catch (IllegalArgumentException exc) {
return Mono.error(exc);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
* {@link ClientHttpResponse} implementation for the RxNetty HTTP client
*
* @author Brian Clozel
*/
public class RxNettyClientHttpResponse implements ClientHttpResponse {
private final HttpClientResponse<ByteBuf> response;
private final HttpHeaders headers;
private final NettyDataBufferAllocator allocator;
public RxNettyClientHttpResponse(HttpClientResponse<ByteBuf> response,
NettyDataBufferAllocator allocator) {
Assert.notNull("'request', request must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = allocator;
this.response = response;
this.headers = new HttpHeaders();
this.response.headerIterator().forEachRemaining(e -> this.headers.set(e.getKey(), e.getValue()));
}
@Override
public HttpStatus getStatusCode() {
return HttpStatus.valueOf(this.response.getStatus().code());
}
@Override
public Flux<DataBuffer> getBody() {
return RxJava1ObservableConverter.from(this.response.getContent().map(allocator::wrap));
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* Create a {@link ClientHttpRequestFactory} for the RxNetty HTTP client
*
* @author Brian Clozel
*/
public class RxNettyHttpClientRequestFactory implements ClientHttpRequestFactory {
private final NettyDataBufferAllocator allocator;
public RxNettyHttpClientRequestFactory(NettyDataBufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) {
Assert.notNull(httpMethod, "HTTP method is required");
Assert.notNull(uri, "request URI is required");
Assert.notNull(headers, "request headers are required");
return new RxNettyClientHttpRequest(httpMethod, uri, headers, this.allocator);
}
}