update consume to subscribe

update after to then (WIP flux)
This commit is contained in:
Stephane Maldini 2016-04-27 21:06:45 +01:00
parent 72b66c9715
commit 09fdc8a384
12 changed files with 19 additions and 19 deletions

View File

@ -76,14 +76,14 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
Mono<Void> mono = Mono.empty();
if (this.state.compareAndSet(State.NEW, State.COMMITTING)) {
for (Supplier<? extends Mono<Void>> action : this.beforeCommitActions) {
mono = mono.after(() -> action.get());
mono = mono.then(() -> action.get());
}
return mono
.otherwise(ex -> {
// Ignore errors from beforeCommit actions
return Mono.empty();
})
.after(() -> {
.then(() -> {
this.state.set(State.COMITTED);
//writeHeaders();
//writeCookies();

View File

@ -106,14 +106,14 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
channel.removeTransferEncodingChunked();
}
return applyBeforeCommit()
.after(() -> {
.then(() -> {
getHeaders().entrySet().stream().forEach(e ->
channel.headers().set(e.getKey(), e.getValue()));
getCookies().values().stream().flatMap(Collection::stream).forEach(cookie ->
channel.addCookie(new DefaultCookie(cookie.getName(), cookie.getValue())));
return Mono.empty();
})
.after(() -> {
.then(() -> {
if (body != null) {
return channel.send(body);
}

View File

@ -103,7 +103,7 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
.createRequest(io.netty.handler.codec.http.HttpMethod.valueOf(this.httpMethod.name()), uri.getRawPath());
return applyBeforeCommit()
.after(() -> Mono.just(request))
.then(() -> Mono.just(request))
.map(req -> {
for (Map.Entry<String, List<String>> entry : getHeaders().entrySet()) {
for (String value : entry.getValue()) {

View File

@ -91,20 +91,20 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
@Override
public Mono<Void> setBody(Publisher<DataBuffer> publisher) {
return new ChannelSendOperator<>(publisher, writePublisher ->
applyBeforeCommit().after(() -> setBodyInternal(writePublisher)));
applyBeforeCommit().then(() -> setBodyInternal(writePublisher)));
}
protected Mono<Void> applyBeforeCommit() {
Mono<Void> mono = Mono.empty();
if (this.state.compareAndSet(STATE_NEW, STATE_COMMITTING)) {
for (Supplier<? extends Mono<Void>> action : this.beforeCommitActions) {
mono = mono.after(action);
mono = mono.then(action);
}
mono = mono.otherwise(ex -> {
// Ignore errors from beforeCommit actions
return Mono.empty();
});
mono = mono.after(() -> {
mono = mono.then(() -> {
this.state.set(STATE_COMMITTED);
writeHeaders();
writeCookies();

View File

@ -106,7 +106,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
@Override
public Mono<Void> setBody(File file, long position, long count) {
return applyBeforeCommit().after(() -> {
return applyBeforeCommit().then(() -> {
return this.channel.sendFile(file, position, count);
});
}

View File

@ -67,7 +67,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
Observable<ByteBuf> content =
RxJava1ObservableConverter.from(publisher).map(this::toByteBuf);
Observable<Void> completion = this.response.write(content);
return RxJava1ObservableConverter.from(completion).after();
return RxJava1ObservableConverter.from(completion).then();
}
private ByteBuf toByteBuf(DataBuffer buffer) {
@ -132,7 +132,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
FileRegion fileRegion = new DefaultFileRegion(file, position, count);
Mono<Void> fileWrite = MonoChannelFuture.from(channel.writeAndFlush(fileRegion));
return Flux.concat(applyBeforeCommit(), responseWrite, fileWrite).after();
return Flux.concat(applyBeforeCommit(), responseWrite, fileWrite).then();
}
*/
}

View File

@ -82,7 +82,7 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return Mono.empty();
})
.after(response::setComplete);
.then(response::setComplete);
}
protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) {

View File

@ -110,7 +110,7 @@ public class DefaultWebSessionManager implements WebSessionManager {
protected Mono<WebSession> validateSession(ServerWebExchange exchange, WebSession session) {
if (session.isExpired()) {
this.sessionIdResolver.setSessionId(exchange, "");
return this.sessionStore.removeSession(session.getId()).after(Mono::empty);
return this.sessionStore.removeSession(session.getId()).then(Mono::empty);
}
else {
return Mono.just(session);

View File

@ -73,7 +73,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
@Override
public Mono<Void> setBody(Publisher<DataBuffer> body) {
this.body = body;
return Flux.from(this.body).after();
return Flux.from(this.body).then();
}
@Override

View File

@ -170,7 +170,7 @@ public class ServerHttpResponseTests {
return Flux.from(publisher).map(b -> {
this.content.add(b);
return b;
}).after();
}).then();
}
}

View File

@ -572,7 +572,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/stream-create")
public Publisher<Void> streamCreate(@RequestBody Flux<Person> personStream) {
return personStream.toList().doOnSuccess(persons::addAll).after();
return personStream.toList().doOnSuccess(persons::addAll).then();
}
@RequestMapping("/person-capitalize")
@ -598,12 +598,12 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/publisher-create")
public Publisher<Void> publisherCreate(@RequestBody Publisher<Person> personStream) {
return Flux.from(personStream).doOnNext(persons::add).after();
return Flux.from(personStream).doOnNext(persons::add).then();
}
@RequestMapping("/flux-create")
public Mono<Void> fluxCreate(@RequestBody Flux<Person> personStream) {
return personStream.doOnNext(persons::add).after();
return personStream.doOnNext(persons::add).then();
}
@RequestMapping("/observable-create")

View File

@ -160,7 +160,7 @@ public class WebSessionIntegrationTests extends AbstractHttpHandlerIntegrationTe
map.put("counter", value);
this.currentValue.set(value);
return session;
}).after();
}).then();
}
}