Add client Request/Response impl. for Reactor-Net

This commit introduces the `ClientHttpRequest` and `ClientHttpResponse`
implementations for the Reactor-Net HTTP client. This client is already
based on the `Flux` and `Mono` contracts.

This commit also adds a `AbstractClientHttpRequest` to support the
`ClientHttpRequest` implementations; it mirrors the
`AbstractServerHttpResponse` contract with a `beforeCommit` to register
`Supplier`s that should be notified before the request is committed.
This commit is contained in:
Brian Clozel 2016-02-01 18:29:13 +01:00
parent a6469baa4f
commit 255d2de553
4 changed files with 342 additions and 0 deletions

View File

@ -0,0 +1,88 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* Base class for {@link ClientHttpRequest} implementations.
*
* @author Rossen Stoyanchev
* @author Brian Clozel
*/
public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
private final HttpHeaders headers;
private AtomicReference<State> state = new AtomicReference<>(State.NEW);
private final List<Supplier<? extends Mono<Void>>> beforeCommitActions = new ArrayList<>(4);
public AbstractClientHttpRequest(HttpHeaders httpHeaders) {
if (httpHeaders == null) {
this.headers = new HttpHeaders();
}
else {
this.headers = httpHeaders;
}
}
@Override
public HttpHeaders getHeaders() {
if (State.COMITTED.equals(this.state.get())) {
return HttpHeaders.readOnlyHttpHeaders(this.headers);
}
return this.headers;
}
protected Mono<Void> applyBeforeCommit() {
Mono<Void> mono = Mono.empty();
if (this.state.compareAndSet(State.NEW, State.COMMITTING)) {
for (Supplier<? extends Mono<Void>> action : this.beforeCommitActions) {
mono = mono.after(() -> action.get());
}
return mono
.otherwise(ex -> {
// Ignore errors from beforeCommit actions
return Mono.empty();
})
.after(() -> {
this.state.set(State.COMITTED);
//writeHeaders();
//writeCookies();
return Mono.empty();
});
}
return mono;
}
@Override
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
Assert.notNull(action);
this.beforeCommitActions.add(action);
}
private enum State {NEW, COMMITTING, COMITTED}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpClient;
import reactor.io.net.http.model.Method;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
/**
* {@link ClientHttpRequest} implementation for the Reactor Net HTTP client
*
* @author Brian Clozel
* @see HttpClient
*/
public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
private final DataBufferAllocator allocator;
private final HttpMethod httpMethod;
private final URI uri;
private final HttpClient<Buffer, Buffer> httpClient;
private Flux<Buffer> body;
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers,
DataBufferAllocator allocator) {
super(headers);
this.allocator = allocator;
this.httpMethod = httpMethod;
this.uri = uri;
this.httpClient = httpClient;
}
@Override
public HttpMethod getMethod() {
return this.httpMethod;
}
@Override
public URI getURI() {
return this.uri;
}
/**
* Set the body of the message to the given {@link Publisher}.
*
* <p>Since the HTTP channel is not yet created when this method
* is called, the {@code Mono<Void>} return value completes immediately.
* For an event that signals that we're done writing the request, check the
* {@link #execute()} method.
*
* @return a publisher that completes immediately.
* @see #execute()
*/
@Override
public Mono<Void> setBody(Publisher<DataBuffer> body) {
this.body = Flux.from(body).map(b -> new Buffer(b.asByteBuffer()));
return Mono.empty();
}
@Override
public Mono<ClientHttpResponse> execute() {
return this.httpClient.request(new Method(httpMethod.toString()), uri.toString(),
channel -> {
// see https://github.com/reactor/reactor-io/pull/8
if (body == null) {
channel.headers().removeTransferEncodingChunked();
}
return applyBeforeCommit()
.after(() ->
{
getHeaders().entrySet().stream()
.forEach(e -> channel.headers().set(e.getKey(), e.getValue()));
return Mono.empty();
}
)
.after(() -> {
if (body != null) {
return channel.writeBufferWith(body);
}
else {
return channel.writeHeaders();
}
});
})
.map(httpChannel -> new ReactorClientHttpResponse(httpChannel, allocator));
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.nio.ByteBuffer;
import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
/**
* {@link ClientHttpResponse} implementation for the Reactor Net HTTP client
*
* @author Brian Clozel
* @see reactor.io.net.http.HttpClient
*/
public class ReactorClientHttpResponse implements ClientHttpResponse {
private final DataBufferAllocator allocator;
private final HttpChannel<Buffer, ByteBuffer> channel;
public ReactorClientHttpResponse(HttpChannel channel, DataBufferAllocator allocator) {
this.allocator = allocator;
this.channel = channel;
}
@Override
public Flux<DataBuffer> getBody() {
return Flux.from(channel.input()).map(b -> allocator.wrap(b.byteBuffer()));
}
@Override
public HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders();
this.channel.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue()));
return headers;
}
@Override
public HttpStatus getStatusCode() {
return HttpStatus.valueOf(this.channel.responseStatus().getCode());
}
@Override
public String toString() {
return "ReactorClientHttpResponse{" +
"request=" + this.channel.method() + " " + this.channel.uri().toString() + "," +
"status=" + getStatusCode() +
'}';
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import reactor.io.net.ReactiveNet;
import reactor.io.net.http.HttpClient;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* Create a {@link ClientHttpRequest} for the Reactor Net HTTP client
*
* @author Brian Clozel
*/
public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory {
private final DataBufferAllocator allocator;
private final HttpClient httpClient;
public ReactorHttpClientRequestFactory() {
this(new DefaultDataBufferAllocator());
}
public ReactorHttpClientRequestFactory(DataBufferAllocator allocator) {
this(allocator, ReactiveNet.httpClient());
}
protected ReactorHttpClientRequestFactory(DataBufferAllocator allocator, HttpClient httpClient) {
this.allocator = allocator;
this.httpClient = httpClient;
}
@Override
public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) {
Assert.notNull(httpMethod, "HTTP method is required");
Assert.notNull(uri, "request URI is required");
Assert.notNull(headers, "request headers are required");
return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers, this.allocator);
}
}