Revert "Remove mandatory dependency on Reactor Stream"

This reverts commit d5e6f70483d4a6c8af3cc5e97e52a54e98199169.
This commit is contained in:
Rossen Stoyanchev 2016-01-07 15:23:55 -05:00
parent 318092cfd0
commit c3a8bf4d17
3 changed files with 11 additions and 46 deletions

View File

@ -12,45 +12,8 @@ apply plugin: 'propdeps'
apply plugin: 'propdeps-idea'
apply plugin: 'propdeps-maven'
ext {
springVersion = '4.2.3.RELEASE'
reactorVersion = '2.5.0.BUILD-SNAPSHOT'
tomcatVersion = '8.0.28'
jettyVersion = '9.3.5.v20151012'
}
configurations {
jarjar
reactorstream
}
task reactorstreamRepackJar(type: Jar) { repackJar ->
repackJar.baseName = "spring-reactive-reactorstream-repack"
repackJar.version = reactorVersion
doLast() {
project.ant {
taskdef name: "jarjar", classname: "com.tonicsystems.jarjar.JarJarTask",
classpath: configurations.jarjar.asPath
jarjar(destfile: repackJar.archivePath) {
configurations.reactorstream.each { originalJar ->
zipfileset(src: originalJar)
}
// repackage reactor. => org.springframework.reactor
rule(pattern: "reactor.rx.**", result: "org.springframework.reactor.rx.@1")
}
}
}
}
jar {
baseName = 'spring-reactive'
dependsOn reactorstreamRepackJar
from(zipTree(reactorstreamRepackJar.archivePath)) {
include "reactor/rx/subscriber/BlockingQueueSubscriber.java"
}
}
group = 'org.springframework.reactive'
@ -67,14 +30,19 @@ configurations.all {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
}
ext {
springVersion = '4.2.3.RELEASE'
reactorVersion = '2.5.0.BUILD-SNAPSHOT'
tomcatVersion = '8.0.28'
jettyVersion = '9.3.5.v20151012'
}
dependencies {
compile "org.springframework:spring-core:${springVersion}"
compile "org.springframework:spring-web:${springVersion}"
compile "org.reactivestreams:reactive-streams:1.0.0"
compile "io.projectreactor:reactor-core:${reactorVersion}"
compile "commons-logging:commons-logging:1.2"
reactorstream("io.projectreactor:reactor-stream:${reactorVersion}@jar")
compile(files(reactorstreamRepackJar))
optional 'io.reactivex:rxjava:1.1.0'
optional "io.reactivex:rxnetty-http:0.5.0-SNAPSHOT"
@ -89,8 +57,6 @@ dependencies {
provided "javax.servlet:javax.servlet-api:3.1.0"
jarjar("com.googlecode.jarjar:jarjar:1.3")
testCompile "junit:junit:4.12"
testCompile "org.springframework:spring-test:${springVersion}"
testCompile "org.slf4j:slf4j-jcl:1.7.12"

View File

@ -19,13 +19,11 @@ package org.springframework.util;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.reactor.rx.subscriber.BlockingQueueSubscriber;
import reactor.rx.Streams;
/**
* {@code InputStream} implementation based on a byte array {@link Publisher}.
@ -62,7 +60,7 @@ public class ByteBufferPublisherInputStream extends InputStream {
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher, int requestSize) {
Assert.notNull(publisher, "'publisher' must not be null");
this.queue = new BlockingQueueSubscriber<>(publisher, null, new ArrayBlockingQueue<>(requestSize), false, requestSize);
this.queue = Streams.from(publisher).toBlockingQueue(requestSize);
}

View File

@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
import org.springframework.http.MediaType;
import org.springframework.util.BufferOutputStream;
@ -72,7 +73,7 @@ public class XmlHandler implements HttpHandler {
bos.close();
buffer.flip();
return response.setBody(Mono.just(buffer.byteBuffer()));
return response.setBody(Streams.just(buffer.byteBuffer()));
}
catch (Exception ex) {
logger.error(ex, ex);