Add request handling infrastructure

HandlerMapping, HandlerAdapter, HandlerResultHandler (+ HandlerResult)
as the basic request handling contracts. DispatcherHandler to drive
overall request handling.

DispatcherApp provides minimal implementations of the above contracts
enough to put together a running example that returns
200 text/plain "Hello world".
This commit is contained in:
Rossen Stoyanchev 2015-08-17 15:54:06 -04:00
parent c682895dee
commit 773d0444bf
6 changed files with 520 additions and 0 deletions

View File

@ -0,0 +1,169 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
/**
* @author Rossen Stoyanchev
*/
public class DispatcherHttpHandler implements HttpHandler {
private List<HandlerMapping> handlerMappings;
private List<HandlerAdapter> handlerAdapters;
private List<HandlerResultHandler> resultHandlers;
protected void initStrategies(ApplicationContext context) {
this.handlerMappings = new ArrayList<>(BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false).values());
this.handlerAdapters = new ArrayList<>(BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false).values());
this.resultHandlers = new ArrayList<>(BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false).values());
}
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Object handler = getHandler(request);
if (handler == null) {
// No exception handling mechanism yet
response.setStatusCode(HttpStatus.NOT_FOUND);
return Publishers.complete();
}
HandlerAdapter handlerAdapter = getHandlerAdapter(handler);
final Publisher<HandlerResult> resultPublisher = handlerAdapter.handle(request, response, handler);
return new Publisher<Void>() {
@Override
public void subscribe(final Subscriber<? super Void> subscriber) {
resultPublisher.subscribe(new Subscriber<HandlerResult>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(HandlerResult result) {
for (HandlerResultHandler resultHandler : resultHandlers) {
if (resultHandler.supports(result)) {
Publisher<Void> publisher = resultHandler.handleResult(request, response, result);
publisher.subscribe(new Subscriber<Void>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable error) {
// Result handling error (no exception handling mechanism yet)
subscriber.onError(error);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
}
}
@Override
public void onError(Throwable error) {
// Application handler error (no exception handling mechanism yet)
subscriber.onError(error);
}
@Override
public void onComplete() {
// do nothing
}
});
}
};
}
protected Object getHandler(ServerHttpRequest request) {
Object handler = null;
for (HandlerMapping handlerMapping : this.handlerMappings) {
handler = handlerMapping.getHandler(request);
if (handler != null) {
break;
}
}
return handler;
}
protected HandlerAdapter getHandlerAdapter(Object handler) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter;
}
}
// more specific exception
throw new IllegalStateException("No HandlerAdapter for " + handler);
}
private static class Publishers {
public static Publisher<Void> complete() {
return subscriber -> {
subscriber.onSubscribe(new NoopSubscription());
subscriber.onComplete();
};
}
}
private static class NoopSubscription implements Subscription {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import org.reactivestreams.Publisher;
/**
* @author Rossen Stoyanchev
*/
public interface HandlerAdapter {
boolean supports(Object handler);
Publisher<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response, Object handler);
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
/**
* @author Rossen Stoyanchev
*/
public interface HandlerMapping {
Object getHandler(ServerHttpRequest request);
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
/**
* @author Rossen Stoyanchev
*/
public class HandlerResult {
private final Object returnValue;
public HandlerResult(Object returnValue) {
this.returnValue = returnValue;
}
public Object getReturnValue() {
return this.returnValue;
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import org.reactivestreams.Publisher;
/**
* @author Rossen Stoyanchev
*/
public interface HandlerResultHandler {
boolean supports(HandlerResult result);
Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result);
}

View File

@ -0,0 +1,233 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.http.MediaType;
import org.springframework.reactive.web.rxnetty.RequestHandlerAdapter;
import org.springframework.web.context.support.StaticWebApplicationContext;
/**
* @author Rossen Stoyanchev
*/
public class DispatcherApp {
public static void main(String[] args) {
StaticWebApplicationContext wac = new StaticWebApplicationContext();
wac.registerSingleton("handlerMapping", SimpleUrlHandlerMapping.class);
wac.registerSingleton("handlerAdapter", PlainTextHandlerAdapter.class);
wac.registerSingleton("resultHandler", PlainTextResultHandler.class);
wac.refresh();
SimpleUrlHandlerMapping handlerMapping = wac.getBean(SimpleUrlHandlerMapping.class);
handlerMapping.addHandler("/text", new HelloWorldTextHandler());
DispatcherHttpHandler dispatcherHandler = new DispatcherHttpHandler();
dispatcherHandler.initStrategies(wac);
RequestHandlerAdapter requestHandler = new RequestHandlerAdapter(dispatcherHandler);
HttpServer<ByteBuf, ByteBuf> server = HttpServer.newServer(8080);
server.start(requestHandler::handle);
server.awaitShutdown();
}
private static class SimpleUrlHandlerMapping implements HandlerMapping {
private final Map<String, Object> handlerMap = new HashMap<>();
public void addHandler(String path, Object handler) {
this.handlerMap.put(path, handler);
}
@Override
public Object getHandler(ServerHttpRequest request) {
return this.handlerMap.get(request.getURI().getPath());
}
}
private interface PlainTextHandler {
Publisher<String> handle(ServerHttpRequest request, ServerHttpResponse response);
}
private static class HelloWorldTextHandler implements PlainTextHandler {
@Override
public Publisher<String> handle(ServerHttpRequest request, ServerHttpResponse response) {
return new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new AbstractSubscription<String>(subscriber) {
@Override
protected void requestInternal(long n) {
invokeOnNext("Hello world.");
invokeOnComplete();
}
});
}
};
}
}
private static class PlainTextHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return PlainTextHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Publisher<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response,
Object handler) {
PlainTextHandler textHandler = (PlainTextHandler) handler;
final Publisher<String> resultPublisher = textHandler.handle(request, response);
return new Publisher<HandlerResult>() {
@Override
public void subscribe(Subscriber<? super HandlerResult> handlerResultSubscriber) {
handlerResultSubscriber.onSubscribe(new AbstractSubscription<HandlerResult>(handlerResultSubscriber) {
@Override
protected void requestInternal(long n) {
resultPublisher.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object result) {
invokeOnNext(new HandlerResult(result));
}
@Override
public void onError(Throwable error) {
invokeOnError(error);
}
@Override
public void onComplete() {
invokeOnComplete();
}
});
}
});
}
};
}
}
private static class PlainTextResultHandler implements HandlerResultHandler {
@Override
public boolean supports(HandlerResult result) {
Object value = result.getReturnValue();
return (value != null && String.class.equals(value.getClass()));
}
@Override
public Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response,
HandlerResult result) {
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
return response.writeWith(new Publisher<byte[]>() {
@Override
public void subscribe(Subscriber<? super byte[]> writeSubscriber) {
writeSubscriber.onSubscribe(new AbstractSubscription<byte[]>(writeSubscriber) {
@Override
protected void requestInternal(long n) {
Charset charset = Charset.forName("UTF-8");
invokeOnNext(((String) result.getReturnValue()).getBytes(charset));
invokeOnComplete();
}
});
}
});
}
}
private static abstract class AbstractSubscription<T> implements Subscription {
private final Subscriber<? super T> subscriber;
private volatile boolean terminated;
public AbstractSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
protected boolean isTerminated() {
return this.terminated;
}
@Override
public void request(long n) {
if (isTerminated()) {
return;
}
if (n > 0) {
requestInternal(n);
}
}
protected abstract void requestInternal(long n);
@Override
public void cancel() {
this.terminated = true;
}
protected void invokeOnNext(T data) {
this.subscriber.onNext(data);
}
protected void invokeOnError(Throwable error) {
this.terminated = true;
this.subscriber.onError(error);
}
protected void invokeOnComplete() {
this.terminated = true;
this.subscriber.onComplete();
}
}
}