Adapt to API changes in the latest reactor snapshot
This commit is contained in:
parent
93b53dae29
commit
673f83e388
|
@ -63,7 +63,7 @@ public class AbstractRoutingConnectionFactoryUnitTests {
|
||||||
connectionFactory.afterPropertiesSet();
|
connectionFactory.afterPropertiesSet();
|
||||||
|
|
||||||
connectionFactory.determineTargetConnectionFactory()
|
connectionFactory.determineTargetConnectionFactory()
|
||||||
.subscriberContext(Context.of(ROUTING_KEY, "key"))
|
.contextWrite(Context.of(ROUTING_KEY, "key"))
|
||||||
.as(StepVerifier::create)
|
.as(StepVerifier::create)
|
||||||
.expectNext(routedConnectionFactory)
|
.expectNext(routedConnectionFactory)
|
||||||
.verifyComplete();
|
.verifyComplete();
|
||||||
|
@ -109,7 +109,7 @@ public class AbstractRoutingConnectionFactoryUnitTests {
|
||||||
connectionFactory.afterPropertiesSet();
|
connectionFactory.afterPropertiesSet();
|
||||||
|
|
||||||
connectionFactory.determineTargetConnectionFactory()
|
connectionFactory.determineTargetConnectionFactory()
|
||||||
.subscriberContext(Context.of(ROUTING_KEY, "unknown"))
|
.contextWrite(Context.of(ROUTING_KEY, "unknown"))
|
||||||
.as(StepVerifier::create)
|
.as(StepVerifier::create)
|
||||||
.verifyError(IllegalStateException.class);
|
.verifyError(IllegalStateException.class);
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ public class AbstractRoutingConnectionFactoryUnitTests {
|
||||||
connectionFactory.afterPropertiesSet();
|
connectionFactory.afterPropertiesSet();
|
||||||
|
|
||||||
connectionFactory.determineTargetConnectionFactory()
|
connectionFactory.determineTargetConnectionFactory()
|
||||||
.subscriberContext(Context.of(ROUTING_KEY, "unknown"))
|
.contextWrite(Context.of(ROUTING_KEY, "unknown"))
|
||||||
.as(StepVerifier::create)
|
.as(StepVerifier::create)
|
||||||
.expectNext(defaultConnectionFactory)
|
.expectNext(defaultConnectionFactory)
|
||||||
.verifyComplete();
|
.verifyComplete();
|
||||||
|
@ -153,7 +153,7 @@ public class AbstractRoutingConnectionFactoryUnitTests {
|
||||||
connectionFactory.afterPropertiesSet();
|
connectionFactory.afterPropertiesSet();
|
||||||
|
|
||||||
connectionFactory.determineTargetConnectionFactory()
|
connectionFactory.determineTargetConnectionFactory()
|
||||||
.subscriberContext(Context.of(ROUTING_KEY, "my-key"))
|
.contextWrite(Context.of(ROUTING_KEY, "my-key"))
|
||||||
.as(StepVerifier::create)
|
.as(StepVerifier::create)
|
||||||
.expectNext(routedConnectionFactory)
|
.expectNext(routedConnectionFactory)
|
||||||
.verifyComplete();
|
.verifyComplete();
|
||||||
|
@ -168,7 +168,7 @@ public class AbstractRoutingConnectionFactoryUnitTests {
|
||||||
connectionFactory.afterPropertiesSet();
|
connectionFactory.afterPropertiesSet();
|
||||||
|
|
||||||
connectionFactory.determineTargetConnectionFactory()
|
connectionFactory.determineTargetConnectionFactory()
|
||||||
.subscriberContext(Context.of(ROUTING_KEY, "lookup-key"))
|
.contextWrite(Context.of(ROUTING_KEY, "lookup-key"))
|
||||||
.as(StepVerifier::create)
|
.as(StepVerifier::create)
|
||||||
.expectNext(defaultConnectionFactory)
|
.expectNext(defaultConnectionFactory)
|
||||||
.verifyComplete();
|
.verifyComplete();
|
||||||
|
@ -177,7 +177,7 @@ public class AbstractRoutingConnectionFactoryUnitTests {
|
||||||
connectionFactory.afterPropertiesSet();
|
connectionFactory.afterPropertiesSet();
|
||||||
|
|
||||||
connectionFactory.determineTargetConnectionFactory()
|
connectionFactory.determineTargetConnectionFactory()
|
||||||
.subscriberContext(Context.of(ROUTING_KEY, "lookup-key"))
|
.contextWrite(Context.of(ROUTING_KEY, "lookup-key"))
|
||||||
.as(StepVerifier::create)
|
.as(StepVerifier::create)
|
||||||
.expectNext(routedConnectionFactory)
|
.expectNext(routedConnectionFactory)
|
||||||
.verifyComplete();
|
.verifyComplete();
|
||||||
|
@ -187,8 +187,12 @@ public class AbstractRoutingConnectionFactoryUnitTests {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Mono<Object> determineCurrentLookupKey() {
|
protected Mono<Object> determineCurrentLookupKey() {
|
||||||
return Mono.subscriberContext().filter(context -> context.hasKey(ROUTING_KEY))
|
return Mono.deferContextual(context -> {
|
||||||
.map(context -> context.get(ROUTING_KEY));
|
if (context.hasKey(ROUTING_KEY)) {
|
||||||
|
return Mono.just(context.get(ROUTING_KEY));
|
||||||
|
}
|
||||||
|
return Mono.empty();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -887,8 +887,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
// target invocation exception
|
// target invocation exception
|
||||||
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
|
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
|
||||||
}
|
}
|
||||||
})).subscriberContext(TransactionContextManager.getOrCreateContext())
|
})).contextWrite(TransactionContextManager.getOrCreateContext())
|
||||||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
|
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Any other reactive type, typically a Flux
|
// Any other reactive type, typically a Flux
|
||||||
|
@ -917,8 +917,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
||||||
// target invocation exception
|
// target invocation exception
|
||||||
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
|
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
|
||||||
}
|
}
|
||||||
})).subscriberContext(TransactionContextManager.getOrCreateContext())
|
})).contextWrite(TransactionContextManager.getOrCreateContext())
|
||||||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()));
|
.contextWrite(TransactionContextManager.getOrCreateContextHolder()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
|
|
|
@ -50,19 +50,17 @@ public abstract class TransactionContextManager {
|
||||||
* or no context found in a holder
|
* or no context found in a holder
|
||||||
*/
|
*/
|
||||||
public static Mono<TransactionContext> currentContext() throws NoTransactionException {
|
public static Mono<TransactionContext> currentContext() throws NoTransactionException {
|
||||||
return Mono.subscriberContext().handle((ctx, sink) -> {
|
return Mono.deferContextual(ctx -> {
|
||||||
if (ctx.hasKey(TransactionContext.class)) {
|
if (ctx.hasKey(TransactionContext.class)) {
|
||||||
sink.next(ctx.get(TransactionContext.class));
|
return Mono.just(ctx.get(TransactionContext.class));
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if (ctx.hasKey(TransactionContextHolder.class)) {
|
if (ctx.hasKey(TransactionContextHolder.class)) {
|
||||||
TransactionContextHolder holder = ctx.get(TransactionContextHolder.class);
|
TransactionContextHolder holder = ctx.get(TransactionContextHolder.class);
|
||||||
if (holder.hasContext()) {
|
if (holder.hasContext()) {
|
||||||
sink.next(holder.currentContext());
|
return Mono.just(holder.currentContext());
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sink.error(new NoTransactionInContextException());
|
return Mono.error(new NoTransactionInContextException());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,8 +82,8 @@ final class TransactionalOperatorImpl implements TransactionalOperator {
|
||||||
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback)
|
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback)
|
||||||
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
|
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
|
||||||
})
|
})
|
||||||
.subscriberContext(TransactionContextManager.getOrCreateContext())
|
.contextWrite(TransactionContextManager.getOrCreateContext())
|
||||||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
|
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -104,8 +104,8 @@ final class TransactionalOperatorImpl implements TransactionalOperator {
|
||||||
.onErrorResume(ex ->
|
.onErrorResume(ex ->
|
||||||
rollbackOnException(it, ex).then(Mono.error(ex))));
|
rollbackOnException(it, ex).then(Mono.error(ex))));
|
||||||
})
|
})
|
||||||
.subscriberContext(TransactionContextManager.getOrCreateContext())
|
.contextWrite(TransactionContextManager.getOrCreateContext())
|
||||||
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
|
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -47,7 +47,7 @@ public class ServerWebExchangeContextFilter implements WebFilter {
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
|
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
|
||||||
return chain.filter(exchange)
|
return chain.filter(exchange)
|
||||||
.subscriberContext(cxt -> cxt.put(EXCHANGE_CONTEXT_ATTRIBUTE, exchange));
|
.contextWrite(cxt -> cxt.put(EXCHANGE_CONTEXT_ATTRIBUTE, exchange));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue