Fixed stalling issue in RequestBodyPublisher.

This commit is contained in:
Arjen Poutsma 2015-09-04 13:33:02 +02:00
parent c1f179677a
commit 0ec29d1c67
4 changed files with 159 additions and 3 deletions

View File

@ -98,6 +98,9 @@ final class AsyncContextSynchronizer {
}
}
/**
* Completes both the reading and writing side of the asynchronous operation.
*/
public void complete() {
readComplete();
writeComplete();

View File

@ -47,6 +47,8 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
private Subscriber<? super byte[]> subscriber;
private boolean stalled;
public RequestBodyPublisher(AsyncContextSynchronizer synchronizer, int bufferSize) {
this.synchronizer = synchronizer;
this.buffer = new byte[bufferSize];
@ -61,16 +63,18 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
@Override
public void onDataAvailable() throws IOException {
ServletInputStream input = this.synchronizer.getInputStream();
logger.debug("onDataAvailable: " + input);
while (true) {
logger.debug("Demand: " + this.demand);
if (!demand.hasDemand()) {
stalled = true;
break;
}
boolean ready = input.isReady();
logger.debug("Input " + ready + "/" + input.isFinished());
logger.debug("Input ready: " + ready + " finished: " + input.isFinished());
if (!ready) {
break;
@ -117,11 +121,12 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
public void request(long n) {
logger.debug("Updating demand " + demand + " by " + n);
boolean stalled = !demand.hasDemand();
demand.increase(n);
logger.debug("Stalled: " + stalled);
if (stalled) {
stalled = false;
try {
onDataAvailable();
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.rx.Streams;
import static org.junit.Assert.assertEquals;
/**
* @author Arjen Poutsma
*/
public class RandomHandler implements HttpHandler {
private static final Log logger = LogFactory.getLog(RandomHandler.class);
public static final int RESPONSE_SIZE = 4096 * 3;
private final Random rnd = new Random();
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
request.getBody().subscribe(new Subscriber<byte[]>() {
private Subscription s;
private int requestSize = 0;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(byte[] bytes) {
requestSize += bytes.length;
}
@Override
public void onError(Throwable t) {
logger.error(t);
}
@Override
public void onComplete() {
logger.debug("Complete");
assertEquals(RandomHandlerIntegrationTests.REQUEST_SIZE, requestSize);
}
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return response.writeWith(Streams.<byte[]>just(randomBytes()));
}
private byte[] randomBytes() {
byte[] buffer = new byte[RESPONSE_SIZE];
rnd.nextBytes(buffer);
return buffer;
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http;
import java.net.URI;
import java.util.Random;
import org.junit.Test;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests {
public static final int REQUEST_SIZE = 4096 * 3;
private Random rnd = new Random();
@Override
protected RandomHandler createHttpHandler() {
return new RandomHandler();
}
@Test
public void random() throws Exception {
RestTemplate restTemplate = new RestTemplate();
byte[] body = randomBytes();
RequestEntity<byte[]> request = RequestEntity.post(new URI("http://localhost:" + port)).body(body);
ResponseEntity<byte[]> response = restTemplate.exchange(request, byte[].class);
assertNotNull(response.getBody());
assertEquals(RandomHandler.RESPONSE_SIZE,
response.getHeaders().getContentLength());
assertEquals(RandomHandler.RESPONSE_SIZE, response.getBody().length);
}
private byte[] randomBytes() {
byte[] buffer = new byte[REQUEST_SIZE];
rnd.nextBytes(buffer);
return buffer;
}
}