Fix memory leaks in ProtobufDecoder

Issue: SPR-17418
This commit is contained in:
Arjen Poutsma 2018-10-22 13:39:22 +02:00
parent a64e85fcc6
commit 946ec7e22e
3 changed files with 32 additions and 29 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2017 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -66,9 +66,12 @@ public abstract class AbstractDataBufferAllocatingTestCase {
} }
protected DataBuffer stringBuffer(String value) { protected DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8); return byteBuffer(value.getBytes(StandardCharsets.UTF_8));
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length); }
buffer.write(bytes);
protected DataBuffer byteBuffer(byte[] value) {
DataBuffer buffer = this.bufferFactory.allocateBuffer(value.length);
buffer.write(value);
return buffer; return buffer;
} }

View File

@ -216,12 +216,18 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
} while (remainingBytesToRead > 0); } while (remainingBytesToRead > 0);
return messages; return messages;
} }
catch (DecodingException ex) {
throw ex;
}
catch (IOException ex) { catch (IOException ex) {
throw new DecodingException("I/O error while parsing input stream", ex); throw new DecodingException("I/O error while parsing input stream", ex);
} }
catch (Exception ex) { catch (Exception ex) {
throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex); throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex);
} }
finally {
DataBufferUtils.release(input);
}
} }
} }

View File

@ -36,8 +36,7 @@ import org.springframework.protobuf.SecondMsg;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import static org.springframework.core.ResolvableType.forClass; import static org.springframework.core.ResolvableType.forClass;
/** /**
@ -81,7 +80,7 @@ public class ProtobufDecoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void decodeToMono() { public void decodeToMono() {
DataBuffer data = this.bufferFactory.wrap(testMsg.toByteArray()); DataBuffer data = byteBuffer(testMsg.toByteArray());
ResolvableType elementType = forClass(Msg.class); ResolvableType elementType = forClass(Msg.class);
Mono<Message> mono = this.decoder.decodeToMono(Flux.just(data), elementType, null, emptyMap()); Mono<Message> mono = this.decoder.decodeToMono(Flux.just(data), elementType, null, emptyMap());
@ -106,12 +105,12 @@ public class ProtobufDecoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void decodeChunksToMono() { public void decodeChunksToMono() {
DataBuffer buffer = this.bufferFactory.wrap(testMsg.toByteArray()); DataBuffer buffer = byteBuffer(testMsg.toByteArray());
Flux<DataBuffer> chunks = Flux.just( Flux<DataBuffer> chunks = Flux.just(
buffer.slice(0, 4), DataBufferUtils.retain(buffer.slice(0, 4)),
buffer.slice(4, buffer.readableByteCount() - 4)); DataBufferUtils.retain(buffer.slice(4, buffer.readableByteCount() - 4)));
DataBufferUtils.retain(buffer);
ResolvableType elementType = forClass(Msg.class); ResolvableType elementType = forClass(Msg.class);
release(buffer);
Mono<Message> mono = this.decoder.decodeToMono(chunks, elementType, null, Mono<Message> mono = this.decoder.decodeToMono(chunks, elementType, null,
emptyMap()); emptyMap());
@ -123,10 +122,11 @@ public class ProtobufDecoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void decode() throws IOException { public void decode() throws IOException {
DataBuffer buffer = bufferFactory.allocateBuffer(); DataBuffer buffer = this.bufferFactory.allocateBuffer();
testMsg.writeDelimitedTo(buffer.asOutputStream()); testMsg.writeDelimitedTo(buffer.asOutputStream());
DataBuffer buffer2 = bufferFactory.allocateBuffer(); DataBuffer buffer2 = this.bufferFactory.allocateBuffer();
testMsg2.writeDelimitedTo(buffer2.asOutputStream()); testMsg2.writeDelimitedTo(buffer2.asOutputStream());
Flux<DataBuffer> source = Flux.just(buffer, buffer2); Flux<DataBuffer> source = Flux.just(buffer, buffer2);
ResolvableType elementType = forClass(Msg.class); ResolvableType elementType = forClass(Msg.class);
@ -136,22 +136,22 @@ public class ProtobufDecoderTests extends AbstractDataBufferAllocatingTestCase {
.expectNext(testMsg) .expectNext(testMsg)
.expectNext(testMsg2) .expectNext(testMsg2)
.verifyComplete(); .verifyComplete();
DataBufferUtils.release(buffer);
DataBufferUtils.release(buffer2);
} }
@Test @Test
public void decodeSplitChunks() throws IOException { public void decodeSplitChunks() throws IOException {
DataBuffer buffer = bufferFactory.allocateBuffer(); DataBuffer buffer = this.bufferFactory.allocateBuffer();
testMsg.writeDelimitedTo(buffer.asOutputStream()); testMsg.writeDelimitedTo(buffer.asOutputStream());
DataBuffer buffer2 = bufferFactory.allocateBuffer(); DataBuffer buffer2 = this.bufferFactory.allocateBuffer();
testMsg2.writeDelimitedTo(buffer2.asOutputStream()); testMsg2.writeDelimitedTo(buffer2.asOutputStream());
Flux<DataBuffer> chunks = Flux.just( Flux<DataBuffer> chunks = Flux.just(
buffer.slice(0, 4), DataBufferUtils.retain(buffer.slice(0, 4)),
buffer.slice(4, buffer.readableByteCount() - 4), DataBufferUtils.retain(buffer.slice(4, buffer.readableByteCount() - 4)),
buffer2.slice(0, 2), DataBufferUtils.retain(buffer2.slice(0, 2)),
buffer2.slice(2, buffer2.readableByteCount() - 2)); DataBufferUtils.retain(buffer2
.slice(2, buffer2.readableByteCount() - 2)));
release(buffer, buffer2);
ResolvableType elementType = forClass(Msg.class); ResolvableType elementType = forClass(Msg.class);
Flux<Message> messages = this.decoder.decode(chunks, elementType, null, emptyMap()); Flux<Message> messages = this.decoder.decode(chunks, elementType, null, emptyMap());
@ -160,9 +160,6 @@ public class ProtobufDecoderTests extends AbstractDataBufferAllocatingTestCase {
.expectNext(testMsg) .expectNext(testMsg)
.expectNext(testMsg2) .expectNext(testMsg2)
.verifyComplete(); .verifyComplete();
DataBufferUtils.release(buffer);
DataBufferUtils.release(buffer2);
} }
@Test @Test
@ -178,15 +175,12 @@ public class ProtobufDecoderTests extends AbstractDataBufferAllocatingTestCase {
.expectNext(testMsg) .expectNext(testMsg)
.expectNext(testMsg) .expectNext(testMsg)
.verifyComplete(); .verifyComplete();
DataBufferUtils.release(buffer);
} }
@Test @Test
public void exceedMaxSize() { public void exceedMaxSize() {
this.decoder.setMaxMessageSize(1); this.decoder.setMaxMessageSize(1);
byte[] body = testMsg.toByteArray(); Flux<DataBuffer> source = Flux.just(byteBuffer(testMsg.toByteArray()));
Flux<DataBuffer> source = Flux.just(this.bufferFactory.wrap(body));
ResolvableType elementType = forClass(Msg.class); ResolvableType elementType = forClass(Msg.class);
Flux<Message> messages = this.decoder.decode(source, elementType, null, Flux<Message> messages = this.decoder.decode(source, elementType, null,
emptyMap()); emptyMap());