MessagingRSocket correctly handles unconsumed input

Closes gh-24741
This commit is contained in:
Rossen Stoyanchev 2020-03-20 10:03:56 +00:00
parent 0d42a1bd7f
commit 7efb62091d
2 changed files with 11 additions and 3 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -174,7 +174,7 @@ class MessagingRSocket extends AbstractRSocket {
.doFinally(s -> {
// Subscription should have happened by now due to ChannelSendOperator
if (!read.get()) {
buffers.subscribe(DataBufferUtils::release);
firstPayload.release();
}
})
.thenMany(Flux.defer(() -> replyMono.isTerminated() ?

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,6 +27,7 @@ import io.rsocket.AbstractRSocket;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.netty.server.CloseableChannel;
@ -149,6 +150,13 @@ class RSocketBufferLeakTests {
StepVerifier.create(result).expectNext("bar").thenCancel().verify(Duration.ofSeconds(5));
}
@Test // gh-24741
void noSuchRouteOnChannelInteraction() {
Flux<String> input = Flux.just("foo", "bar", "baz");
Flux<String> result = requester.route("no-such-route").data(input).retrieveFlux(String.class);
StepVerifier.create(result).expectError(ApplicationErrorException.class).verify(Duration.ofSeconds(5));
}
@Controller
static class ServerController {