Proper closing of resources

This commit introduces a `doFinally` block that properly signals the
end of input for the non-blocking XML and JSON parsers.
This commit is contained in:
Arjen Poutsma 2017-06-29 16:48:53 +02:00
parent 9d7b8503d0
commit 58a5e7f17b
3 changed files with 17 additions and 3 deletions

View File

@ -82,8 +82,10 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(nonBlockingParser(), true);
Flux<TokenBuffer> tokens = Flux.from(input)
.flatMap(new Jackson2Tokenizer(nonBlockingParser(), true));
.flatMap(tokenizer)
.doFinally(t -> tokenizer.endOfInput());
return decodeInternal(tokens, elementType, mimeType, hints);
}
@ -92,8 +94,10 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(nonBlockingParser(), false);
Flux<TokenBuffer> tokens = Flux.from(input)
.flatMap(new Jackson2Tokenizer(nonBlockingParser(), false));
.flatMap(tokenizer)
.doFinally(t -> tokenizer.endOfInput());
return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty();
}

View File

@ -106,6 +106,10 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
}
}
public void endOfInput() {
this.inputFeeder.endOfInput();
}
private void calculateDepth(JsonToken token) {
switch (token) {
case START_OBJECT:

View File

@ -97,7 +97,9 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
Flux<DataBuffer> flux = Flux.from(inputStream);
if (useAalto) {
return flux.flatMap(new AaltoDataBufferToXmlEvent());
AaltoDataBufferToXmlEvent aaltoMapper = new AaltoDataBufferToXmlEvent();
return flux.flatMap(aaltoMapper)
.doFinally(signalType -> aaltoMapper.endOfInput());
}
else {
Mono<DataBuffer> singleBuffer = flux.reduce(DataBuffer::write);
@ -158,6 +160,10 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
DataBufferUtils.release(dataBuffer);
}
}
public void endOfInput() {
this.streamReader.getInputFeeder().endOfInput();
}
}
}