This commit is contained in:
Dariusz Jędrzejczyk 2025-10-07 23:10:35 +03:00 committed by GitHub
commit 2cf6cbb6f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 44 additions and 32 deletions

View File

@ -10,7 +10,7 @@ dependencies {
api(platform("com.fasterxml.jackson:jackson-bom:2.20.0"))
api(platform("io.micrometer:micrometer-bom:1.16.0-M3"))
api(platform("io.netty:netty-bom:4.2.6.Final"))
api(platform("io.projectreactor:reactor-bom:2025.0.0-M7"))
api(platform("io.projectreactor:reactor-bom:2025.0.0-SNAPSHOT"))
api(platform("io.rsocket:rsocket-bom:1.1.5"))
api(platform("org.apache.groovy:groovy-bom:5.0.1"))
api(platform("org.apache.logging.log4j:log4j-bom:2.25.1"))

View File

@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@ -1170,7 +1171,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
if (invokeFailure.get()) {
return Mono.error(ex);
}
return (Mono) invokeOperation(invoker);
return (Mono) Objects.requireNonNull(invokeOperation(invoker));
}
catch (RuntimeException exception) {
return Mono.error(exception);
@ -1201,8 +1202,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
.switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))
.switchIfEmpty(Flux.defer(() -> (Flux) Objects.requireNonNull(evaluate(null, invoker, method, contexts))))
.flatMap(v -> Objects.requireNonNull(evaluate(valueToFlux(v, contexts), invoker, method, contexts)))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
@ -1216,8 +1217,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
else {
return adapter.fromPublisher(Mono.fromFuture(cachedFuture)
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))
// TODO: requireNonNull is not for
// free, perhaps we should
// suppress it
.switchIfEmpty(Mono.defer(() -> (Mono) Objects.requireNonNull(evaluate(null, invoker, method, contexts))))
.flatMap(v -> Objects.requireNonNull(evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);

View File

@ -17,6 +17,7 @@
package org.springframework.core.codec;
import java.util.Map;
import java.util.Objects;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
@ -84,15 +85,16 @@ public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(input).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
return Flux.from(input).map(buffer ->
Objects.requireNonNull(decodeDataBuffer(buffer, elementType, mimeType, hints)));
}
@Override
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return DataBufferUtils.join(input, this.maxInMemorySize)
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
return DataBufferUtils.join(input, this.maxInMemorySize).map(buffer ->
Objects.requireNonNull(decodeDataBuffer(buffer, elementType, mimeType, hints)));
}
/**

View File

@ -20,6 +20,7 @@ import java.lang.annotation.Annotation;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
@ -227,7 +228,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints)))
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
@ -241,7 +242,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints)))
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));

View File

@ -18,6 +18,7 @@ package org.springframework.messaging.rsocket;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import io.rsocket.Payload;
@ -291,7 +292,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return (Mono<T>) payloadMono.map(this::retainDataAndReleasePayload)
.map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
.map(dataBuffer -> Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)));
}
@Override
@ -317,7 +318,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer ->
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
(T) Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)));
}
private Mono<Payload> getPayloadMono() {

View File

@ -20,6 +20,7 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import io.r2dbc.spi.Connection;
@ -176,7 +177,7 @@ public class SingleConnectionFactory extends DelegatingConnectionFactory
this.connection =
(isSuppressClose() ? getCloseSuppressingConnectionProxy(connectionToUse) : connectionToUse);
}
return this.connection;
return Objects.requireNonNull(this.connection);
}).flatMap(this::prepareConnection);
}

View File

@ -17,6 +17,7 @@
package org.springframework.transaction.interceptor;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -903,7 +904,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
return (Mono<?>) Objects.requireNonNull(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);

View File

@ -301,7 +301,7 @@ abstract class AbstractCoroutinesTransactionAspectTests {
private fun checkReactiveTransaction(expected: Boolean) {
Mono.deferContextual{context -> Mono.just(context)}
.handle { context: ContextView, sink: SynchronousSink<Any?> ->
.handle { context: ContextView, sink: SynchronousSink<ContextView> ->
if (context.hasKey(TransactionContext::class.java) != expected) {
Fail.fail<Any>("Should have thrown NoTransactionException")
}

View File

@ -165,8 +165,9 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
return requestSender
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
responseRef.set(new ReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection);
responseRef.set(clientResponse);
return Mono.just((ClientHttpResponse) clientResponse);
})
.next()
.doOnCancel(() -> {

View File

@ -66,10 +66,10 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests<KotlinSerializa
pojo3
)
val pojoBytes = Cbor.Default.encodeToByteArray(arrayOf(pojo1, pojo2, pojo3))
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(pojoBytes)
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
@ -78,7 +78,7 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests<KotlinSerializa
fun encodeMono() {
val pojo = Pojo("foo", "bar")
val input = Mono.just(pojo)
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(Cbor.Default.encodeToByteArray(pojo))
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })

View File

@ -45,9 +45,9 @@ class CustomKotlinSerializationJsonEncoderTests :
@Test
override fun encode() {
val input = Mono.just(BigDecimal(1))
testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep<DataBuffer?> ->
testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep<DataBuffer> ->
step.consumeNextWith(expectString("1.0")
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}

View File

@ -76,10 +76,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests<KotlinSeria
pojo3
)
val pojoBytes = ProtoBuf.Default.encodeToByteArray(arrayOf(pojo1, pojo2, pojo3))
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(pojoBytes)
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
@ -88,10 +88,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests<KotlinSeria
fun encodeMono() {
val pojo = Pojo("foo", "bar")
val input = Mono.just(pojo)
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(ProtoBuf.Default.encodeToByteArray(pojo))
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}

View File

@ -53,11 +53,10 @@ public class SessionAttributeMethodArgumentResolver extends AbstractNamedValueAr
return new NamedValueInfo(ann.name(), ann.required(), ValueConstants.DEFAULT_NONE);
}
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
@Override
protected Mono<Object> resolveName(String name, MethodParameter parameter, ServerWebExchange exchange) {
return exchange.getSession()
.filter(session -> session.getAttribute(name) != null)
.map(session -> session.getAttribute(name));
return exchange.getSession().mapNotNull(session -> session.getAttribute(name));
}
@Override

View File

@ -739,7 +739,8 @@ class CoRouterFunctionDsl internal constructor (private val init: (CoRouterFunct
}
@PublishedApi
internal fun <T> asMono(request: ServerRequest, context: CoroutineContext = Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T): Mono<T> {
internal fun <T : Any> asMono(request: ServerRequest, context: CoroutineContext =
Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T?): Mono<T> {
return mono(context) {
contextProvider?.let {
withContext(it.invoke(request)) {

View File

@ -381,7 +381,8 @@ class InvocableHandlerMethodKotlinTests {
StepVerifier.create(mono)
.consumeNextWith {
if (it.returnValue is Mono<*>) {
StepVerifier.create(it.returnValue as Mono<*>).expectNext(expected).verifyComplete()
StepVerifier.create(it.returnValue as Mono<*>).expectNext(requireNotNull(expected))
.verifyComplete()
} else {
assertThat(it.returnValue).isEqualTo(expected)
}