Compare commits

...

12 Commits

Author SHA1 Message Date
Vincent Lee 0ea2d37ff6
Merge 493f90d6ba into 4a419818af 2025-09-16 13:42:26 +08:00
manojks1999 4a419818af
perf: improved Date formatter (#3466)
Sentinel CI / build (11) (push) Has been cancelled Details
Sentinel CI / build (17) (push) Has been cancelled Details
Sentinel CI / build (21) (push) Has been cancelled Details
Sentinel CI / build (8) (push) Has been cancelled Details
CodeQL / Analyze (java) (push) Has been cancelled Details
document-lint / document-lint (push) Has been cancelled Details
* Improved Date formatter

* change in naming conv

* change in datetime format

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-09-16 09:42:41 +08:00
Heramb Joshi d9398b4f75
chore(test): fixed offset syntax by using a correct method. (#3459) 2025-09-15 19:34:41 +08:00
uuuyuqi 4cad353dda
feat(adapter): new SpringCloudGateway v6 adapter for supporting the SpringCloud 2025 version (#3542)
* feat: support Spring Cloud Alibaba 2025

* Add maven property to fix ci failure

* doc: Update README.md
2025-08-25 15:11:46 +08:00
mawen ca596457b0
refactor: remove repeat check code (#3501) 2025-08-19 19:27:34 +08:00
DepUpdater 9fa818cbc9
chore(deps): bump slf4j-api version to 2.0.16(#3536) 2025-08-19 11:42:36 +08:00
LearningGp 5473ad7b3e
fix(README.md): Fix MD059 issue (#3539) 2025-08-07 15:38:27 +08:00
liwen.2022 493f90d6ba feat: add prefetch count to cluster limiter
Change-Id: Id9ed6e813a389fa494fc63bae4ec882d873a0fe3
2024-07-24 21:05:58 +08:00
liwen.2022 dba80f8719 feat: handle wait for cluster limiter
Change-Id: Ic2b76270fc868a451ddb06045c4e1321e71d12ec
2024-07-17 11:33:22 +08:00
absolute8511 de16134388 fix: reset stats should also reset rollingCounterInMinute
Change-Id: I86e004d9af1a70011dc9efd630735e4ee9b495e6
2024-04-30 18:07:55 +08:00
absolute8511 6a149f7091 feat: add fallback stats for cluster limit
Change-Id: I110062ea817430e8e52779d5c86804f5a476e13f
2024-04-30 14:58:17 +08:00
absolute8511 109c3b4c6d feat: add local token cache for cluster
Change-Id: I6b1ece200cd6b2db181f570ab7207325f12fc12b
2024-04-23 20:47:30 +08:00
46 changed files with 2060 additions and 25 deletions

View File

@ -27,6 +27,7 @@
<module>sentinel-spring-webflux-adapter</module>
<module>sentinel-api-gateway-adapter-common</module>
<module>sentinel-spring-cloud-gateway-adapter</module>
<module>sentinel-spring-cloud-gateway-v6x-adapter</module>
<module>sentinel-web-adapter-common</module>
<module>sentinel-spring-webmvc-adapter</module>
<module>sentinel-spring-webmvc-v6x-adapter</module>

View File

@ -31,7 +31,7 @@ If you don't want the filters enabled, you can manually disable them. For exampl
<dubbo:provider filter="-sentinel.dubbo.provider.filter"/>
```
For more details of Dubbo filter, see [here](http://dubbo.apache.org/en-us/docs/dev/impls/filter.html).
For more details of Dubbo filter, see [Dubbo filter documentation](https://cn.dubbo.apache.org/en/overview/mannual/java-sdk/tasks/extensibility/filter/).
## Dubbo resources

View File

@ -32,7 +32,7 @@ If you don't want the filters enabled, you can manually disable them. For exampl
<dubbo:provider filter="-sentinel.dubbo.provider.filter"/>
```
For more details of Dubbo filter, see [here](https://dubbo.apache.org/zh/docs3-v2/java-sdk/reference-manual/spi/description/filter/).
For more details of Dubbo filter, see [Dubbo filter documentation](https://cn.dubbo.apache.org/en/overview/mannual/java-sdk/tasks/extensibility/filter/).
## Dubbo resources

View File

@ -31,7 +31,7 @@ If you don't want the filters enabled, you can manually disable them. For exampl
<dubbo:provider filter="-sentinel.dubbo.provider.filter"/>
```
For more details of Dubbo filter, see [here](http://dubbo.apache.org/en-us/docs/dev/impls/filter.html).
For more details of Dubbo filter, see [Dubbo filter documentation](https://cn.dubbo.apache.org/en/overview/mannual/java-sdk/tasks/extensibility/filter/).
## Dubbo resources

View File

@ -35,7 +35,7 @@ or add setting in `rpc-config.json` file, and its priority is lower than above.
}
```
For more details of SOFARPC filter, see [here](https://www.sofastack.tech/projects/sofa-rpc/custom-filter/).
For more details of SOFARPC filter, see [SOFARPC filter documentation](https://www.sofastack.tech/projects/sofa-rpc/custom-filter/).
## SOFARPC resources

View File

@ -0,0 +1,54 @@
# Sentinel Spring Cloud Gateway Adapter
Sentinel provides integration module with Spring Cloud Gateway.
The integration module is based on the Sentinel Reactor Adapter.
> This module is quite similar to sentinel-spring-cloud-gateway-adapter and The difference is that this module has made some adaptations for webflux 6.x.
> The usage is consistent with sentinel-spring-cloud-gateway-adapter, with the difference being that the maven dependency is different.
Add the following dependency in `pom.xml` (if you are using Maven):
```xml
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-cloud-gateway-v6x-adapter</artifactId>
<version>x.y.z</version>
</dependency>
```
Then you only need to inject the corresponding `SentinelGatewayFilter` and `SentinelGatewayBlockExceptionHandler` instance
in Spring configuration. For example:
```java
@Configuration
public class GatewayConfiguration {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}
@Bean
@Order(-1)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
// Register the block exception handler for Spring Cloud Gateway.
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
}
@Bean
@Order(-1)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}
}
```
The gateway adapter will regard all `routeId` (defined in Spring properties) and all customized API definitions
(defined in `GatewayApiDefinitionManager` of `sentinel-api-gateway-adapter-common` module) as resources.
You can register various customized callback in `GatewayCallbackManager`:
- `setBlockHandler`: register a customized `BlockRequestHandler` to handle the blocked request. The default implementation is `DefaultBlockRequestHandler`, which returns default message like `Blocked by Sentinel: FlowException`.

View File

@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-adapter</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.8.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sentinel-spring-cloud-gateway-v6x-adapter</artifactId>
<packaging>jar</packaging>
<properties>
<java.source.version>17</java.source.version>
<java.target.version>17</java.target.version>
<spring.cloud.gateway.version>4.3.0</spring.cloud.gateway.version>
<spring.boot.version>3.5.0</spring.boot.version>
<spring.version>6.2.7</spring.version>
<skip.spring.v6x.test>false</skip.spring.v6x.test>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-api-gateway-adapter-common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-reactor-adapter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gateway-server</artifactId>
<version>${spring.cloud.gateway.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<version>${spring.cloud.gateway.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>${spring.boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven.surefire.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>3.2.5</version>
</dependency>
</dependencies>
<configuration>
<skipTests>${skip.spring.v6x.test}</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,118 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.ResourceTypeConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.param.GatewayParamParser;
import com.alibaba.csp.sentinel.adapter.gateway.common.param.RequestItemParser;
import com.alibaba.csp.sentinel.adapter.gateway.sc.api.GatewayApiMatcherManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.api.matcher.WebExchangeApiMatcher;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.reactor.ContextConfig;
import com.alibaba.csp.sentinel.adapter.reactor.EntryConfig;
import com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorTransformer;
import com.alibaba.csp.sentinel.util.AssertUtil;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter, Ordered {
private final int order;
private final GatewayParamParser<ServerWebExchange> paramParser;
public SentinelGatewayFilter() {
this(Ordered.HIGHEST_PRECEDENCE);
}
public SentinelGatewayFilter(int order) {
this(order, new ServerWebExchangeItemParser());
}
public SentinelGatewayFilter(RequestItemParser<ServerWebExchange> serverWebExchangeItemParser) {
this(Ordered.HIGHEST_PRECEDENCE, serverWebExchangeItemParser);
}
public SentinelGatewayFilter(int order, RequestItemParser<ServerWebExchange> requestItemParser) {
AssertUtil.notNull(requestItemParser, "requestItemParser cannot be null");
this.order = order;
this.paramParser = new GatewayParamParser<>(requestItemParser);
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
Mono<Void> asyncResult = chain.filter(exchange);
if (route != null) {
String routeId = route.getId();
Object[] params = paramParser.parseParameterFor(routeId, exchange,
r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID);
String origin = Optional.ofNullable(GatewayCallbackManager.getRequestOriginParser())
.map(f -> f.apply(exchange))
.orElse("");
asyncResult = asyncResult.transform(
new SentinelReactorTransformer<>(new EntryConfig(routeId, ResourceTypeConstants.COMMON_API_GATEWAY,
EntryType.IN, 1, params, new ContextConfig(contextName(routeId), origin)))
);
}
Set<String> matchingApis = pickMatchingApiDefinitions(exchange);
for (String apiName : matchingApis) {
Object[] params = paramParser.parseParameterFor(apiName, exchange,
r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME);
asyncResult = asyncResult.transform(
new SentinelReactorTransformer<>(new EntryConfig(apiName, ResourceTypeConstants.COMMON_API_GATEWAY,
EntryType.IN, 1, params))
);
}
return asyncResult;
}
private String contextName(String route) {
return SentinelGatewayConstants.GATEWAY_CONTEXT_ROUTE_PREFIX + route;
}
Set<String> pickMatchingApiDefinitions(ServerWebExchange exchange) {
return GatewayApiMatcherManager.getApiMatcherMap().values()
.stream()
.filter(m -> m.test(exchange))
.map(WebExchangeApiMatcher::getApiName)
.collect(Collectors.toSet());
}
@Override
public int getOrder() {
return order;
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc;
import java.net.InetSocketAddress;
import java.util.Optional;
import com.alibaba.csp.sentinel.adapter.gateway.common.param.RequestItemParser;
import org.springframework.http.HttpCookie;
import org.springframework.web.server.ServerWebExchange;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class ServerWebExchangeItemParser implements RequestItemParser<ServerWebExchange> {
@Override
public String getPath(ServerWebExchange exchange) {
return exchange.getRequest().getPath().value();
}
@Override
public String getRemoteAddress(ServerWebExchange exchange) {
InetSocketAddress remoteAddress = exchange.getRequest().getRemoteAddress();
if (remoteAddress == null) {
return null;
}
return remoteAddress.getAddress().getHostAddress();
}
@Override
public String getHeader(ServerWebExchange exchange, String key) {
return exchange.getRequest().getHeaders().getFirst(key);
}
@Override
public String getUrlParam(ServerWebExchange exchange, String paramName) {
return exchange.getRequest().getQueryParams().getFirst(paramName);
}
@Override
public String getCookieValue(ServerWebExchange exchange, String cookieName) {
return Optional.ofNullable(exchange.getRequest().getCookies().getFirst(cookieName))
.map(HttpCookie::getValue)
.orElse(null);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.api;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
import com.alibaba.csp.sentinel.adapter.gateway.sc.api.matcher.WebExchangeApiMatcher;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public final class GatewayApiMatcherManager {
private static volatile Map<String, WebExchangeApiMatcher> API_MATCHER_MAP = new HashMap<>();
public static Map<String, WebExchangeApiMatcher> getApiMatcherMap() {
return Collections.unmodifiableMap(API_MATCHER_MAP);
}
public static Optional<WebExchangeApiMatcher> getMatcher(final String apiName) {
return Optional.ofNullable(apiName)
.map(e -> API_MATCHER_MAP.get(apiName));
}
public static Set<ApiDefinition> getApiDefinitionSet() {
return API_MATCHER_MAP.values()
.stream()
.map(WebExchangeApiMatcher::getApiDefinition)
.collect(Collectors.toSet());
}
static synchronized void loadApiDefinitions(/*@Valid*/ Set<ApiDefinition> definitions) {
Map<String, WebExchangeApiMatcher> apiMatcherMap = new HashMap<>();
for (ApiDefinition definition : definitions) {
apiMatcherMap.put(definition.getApiName(), new WebExchangeApiMatcher(definition));
}
API_MATCHER_MAP = apiMatcherMap;
}
private GatewayApiMatcherManager() {}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.api;
import java.util.Set;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinitionChangeObserver;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class SpringCloudGatewayApiDefinitionChangeObserver implements ApiDefinitionChangeObserver {
@Override
public void onChange(Set<ApiDefinition> apiDefinitions) {
GatewayApiMatcherManager.loadApiDefinitions(apiDefinitions);
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.api.matcher;
import java.util.Optional;
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.matcher.AbstractApiMatcher;
import com.alibaba.csp.sentinel.adapter.gateway.sc.route.RouteMatchers;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.function.Predicate;
import org.springframework.web.server.ServerWebExchange;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class WebExchangeApiMatcher extends AbstractApiMatcher<ServerWebExchange> {
public WebExchangeApiMatcher(ApiDefinition apiDefinition) {
super(apiDefinition);
}
@Override
protected void initializeMatchers() {
if (apiDefinition.getPredicateItems() != null) {
apiDefinition.getPredicateItems().forEach(item ->
fromApiPredicate(item).ifPresent(matchers::add));
}
}
private Optional<Predicate<ServerWebExchange>> fromApiPredicate(/*@NonNull*/ ApiPredicateItem item) {
if (item instanceof ApiPathPredicateItem) {
return fromApiPathPredicate((ApiPathPredicateItem)item);
}
return Optional.empty();
}
private Optional<Predicate<ServerWebExchange>> fromApiPathPredicate(/*@Valid*/ ApiPathPredicateItem item) {
String pattern = item.getPattern();
if (StringUtil.isBlank(pattern)) {
return Optional.empty();
}
switch (item.getMatchStrategy()) {
case SentinelGatewayConstants.URL_MATCH_STRATEGY_REGEX:
return Optional.of(RouteMatchers.regexPath(pattern));
case SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX:
return Optional.of(RouteMatchers.antPath(pattern));
default:
return Optional.of(RouteMatchers.exactPath(pattern));
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* Reactive handler for the blocked request.
*
* @author Eric Zhao
*/
@FunctionalInterface
public interface BlockRequestHandler {
/**
* Handle the blocked request.
*
* @param exchange server exchange object
* @param t block exception
* @return server response to return
*/
Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t);
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
import java.util.List;
import org.springframework.http.HttpStatus;
import org.springframework.http.InvalidMediaTypeException;
import org.springframework.http.MediaType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* The default implementation of {@link BlockRequestHandler}.
* Compatible with Spring WebFlux v6x and Spring Cloud Gateway.
*
* @author uuuyuqi
*/
public class DefaultBlockRequestHandler implements BlockRequestHandler {
private static final String DEFAULT_BLOCK_MSG_PREFIX = "Blocked by Sentinel: ";
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable ex) {
if (acceptsHtml(exchange)) {
return htmlErrorResponse(ex);
}
// JSON result by default.
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(buildErrorResult(ex));
}
private Mono<ServerResponse> htmlErrorResponse(Throwable ex) {
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.contentType(MediaType.TEXT_PLAIN)
.bodyValue(DEFAULT_BLOCK_MSG_PREFIX + ex.getClass().getSimpleName());
}
private ErrorResult buildErrorResult(Throwable ex) {
return new ErrorResult(HttpStatus.TOO_MANY_REQUESTS.value(),
DEFAULT_BLOCK_MSG_PREFIX + ex.getClass().getSimpleName());
}
/**
* Reference from {@code DefaultErrorWebExceptionHandler} of Spring Boot.
*/
private boolean acceptsHtml(ServerWebExchange exchange) {
try {
List<MediaType> acceptedMediaTypes = exchange.getRequest().getHeaders().getAccept();
acceptedMediaTypes.remove(MediaType.ALL);
MimeTypeUtils.sortBySpecificity(acceptedMediaTypes);
return acceptedMediaTypes.stream()
.anyMatch(MediaType.TEXT_HTML::isCompatibleWith);
} catch (InvalidMediaTypeException ex) {
return false;
}
}
private static class ErrorResult {
private final int code;
private final String message;
ErrorResult(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
import java.util.function.Function;
import com.alibaba.csp.sentinel.util.AssertUtil;
import org.springframework.web.server.ServerWebExchange;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public final class GatewayCallbackManager {
private static final Function<ServerWebExchange, String> DEFAULT_ORIGIN_PARSER = (w) -> "";
/**
* BlockRequestHandler: (serverExchange, exception) -> response
*/
private static volatile BlockRequestHandler blockHandler = new DefaultBlockRequestHandler();
/**
* RequestOriginParser: (serverExchange) -> origin
*/
private static volatile Function<ServerWebExchange, String> requestOriginParser = DEFAULT_ORIGIN_PARSER;
public static /*@NonNull*/ BlockRequestHandler getBlockHandler() {
return blockHandler;
}
public static void resetBlockHandler() {
GatewayCallbackManager.blockHandler = new DefaultBlockRequestHandler();
}
public static void setBlockHandler(BlockRequestHandler blockHandler) {
AssertUtil.notNull(blockHandler, "blockHandler cannot be null");
GatewayCallbackManager.blockHandler = blockHandler;
}
public static /*@NonNull*/ Function<ServerWebExchange, String> getRequestOriginParser() {
return requestOriginParser;
}
public static void resetRequestOriginParser() {
GatewayCallbackManager.requestOriginParser = DEFAULT_ORIGIN_PARSER;
}
public static void setRequestOriginParser(Function<ServerWebExchange, String> requestOriginParser) {
AssertUtil.notNull(requestOriginParser, "requestOriginParser cannot be null");
GatewayCallbackManager.requestOriginParser = requestOriginParser;
}
private GatewayCallbackManager() {}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
import java.net.URI;
import com.alibaba.csp.sentinel.util.AssertUtil;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class RedirectBlockRequestHandler implements BlockRequestHandler {
private final URI uri;
public RedirectBlockRequestHandler(String url) {
AssertUtil.assertNotBlank(url, "url cannot be blank");
this.uri = URI.create(url);
}
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) {
return ServerResponse.temporaryRedirect(uri).build();
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.exception;
import java.util.List;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.util.function.Supplier;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebExceptionHandler;
import reactor.core.publisher.Mono;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class SentinelGatewayBlockExceptionHandler implements WebExceptionHandler {
private List<ViewResolver> viewResolvers;
private List<HttpMessageWriter<?>> messageWriters;
public SentinelGatewayBlockExceptionHandler(List<ViewResolver> viewResolvers, ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolvers;
this.messageWriters = serverCodecConfigurer.getWriters();
}
private Mono<Void> writeResponse(ServerResponse response, ServerWebExchange exchange) {
return response.writeTo(exchange, contextSupplier.get());
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
if (exchange.getResponse().isCommitted()) {
return Mono.error(ex);
}
// This exception handler only handles rejection by Sentinel.
if (!BlockException.isBlockException(ex)) {
return Mono.error(ex);
}
return handleBlockedRequest(exchange, ex)
.flatMap(response -> writeResponse(response, exchange));
}
private Mono<ServerResponse> handleBlockedRequest(ServerWebExchange exchange, Throwable throwable) {
return GatewayCallbackManager.getBlockHandler().handleRequest(exchange, throwable);
}
private final Supplier<ServerResponse.Context> contextSupplier = () -> new ServerResponse.Context() {
@Override
public List<HttpMessageWriter<?>> messageWriters() {
return SentinelGatewayBlockExceptionHandler.this.messageWriters;
}
@Override
public List<ViewResolver> viewResolvers() {
return SentinelGatewayBlockExceptionHandler.this.viewResolvers;
}
};
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.route;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.function.Predicate;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.PathMatcher;
import org.springframework.web.server.ServerWebExchange;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class AntRoutePathMatcher implements Predicate<ServerWebExchange> {
private final String pattern;
private final PathMatcher pathMatcher;
private final boolean canMatch;
public AntRoutePathMatcher(String pattern) {
AssertUtil.assertNotBlank(pattern, "pattern cannot be blank");
this.pattern = pattern;
this.pathMatcher = new AntPathMatcher();
this.canMatch = pathMatcher.isPattern(pattern);
}
@Override
public boolean test(ServerWebExchange exchange) {
String path = exchange.getRequest().getPath().value();
if (canMatch) {
return pathMatcher.match(pattern, path);
}
return false;
}
public String getPattern() {
return pattern;
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.route;
import java.util.regex.Pattern;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.function.Predicate;
import org.springframework.web.server.ServerWebExchange;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public class RegexRoutePathMatcher implements Predicate<ServerWebExchange> {
private final String pattern;
private final Pattern regex;
public RegexRoutePathMatcher(String pattern) {
AssertUtil.assertNotBlank(pattern, "pattern cannot be blank");
this.pattern = pattern;
this.regex = Pattern.compile(pattern);
}
@Override
public boolean test(ServerWebExchange exchange) {
String path = exchange.getRequest().getPath().value();
return regex.matcher(path).matches();
}
public String getPattern() {
return pattern;
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc.route;
import com.alibaba.csp.sentinel.util.function.Predicate;
import org.springframework.web.server.ServerWebExchange;
/**
* @author Eric Zhao
* @since 1.6.0
*/
public final class RouteMatchers {
public static Predicate<ServerWebExchange> all() {
return exchange -> true;
}
public static Predicate<ServerWebExchange> antPath(String pathPattern) {
return new AntRoutePathMatcher(pathPattern);
}
public static Predicate<ServerWebExchange> exactPath(final String path) {
return exchange -> exchange.getRequest().getPath().value().equals(path);
}
public static Predicate<ServerWebExchange> regexPath(String pathPattern) {
return new RegexRoutePathMatcher(pathPattern);
}
private RouteMatchers() {}
}

View File

@ -0,0 +1 @@
com.alibaba.csp.sentinel.adapter.gateway.sc.api.SpringCloudGatewayApiDefinitionChangeObserver

View File

@ -0,0 +1,91 @@
package com.alibaba.csp.sentinel.adapter.gateway.sc;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.http.server.RequestPath;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test cases for {@link SentinelGatewayFilter}.
*
* @author Eric Zhao
*/
public class SentinelGatewayFilterTest {
@Test
public void testPickMatchingApiDefinitions() {
// Mock a request.
ServerWebExchange exchange = mock(ServerWebExchange.class);
ServerHttpRequest request = mock(ServerHttpRequest.class);
when(exchange.getRequest()).thenReturn(request);
RequestPath requestPath = mock(RequestPath.class);
when(request.getPath()).thenReturn(requestPath);
// Prepare API definitions.
Set<ApiDefinition> apiDefinitions = new HashSet<>();
String apiName1 = "some_customized_api";
ApiDefinition api1 = new ApiDefinition(apiName1)
.setPredicateItems(Collections.singleton(
new ApiPathPredicateItem().setPattern("/product/**")
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX)
));
String apiName2 = "another_customized_api";
ApiDefinition api2 = new ApiDefinition(apiName2)
.setPredicateItems(new HashSet<ApiPredicateItem>() {{
add(new ApiPathPredicateItem().setPattern("/something"));
add(new ApiPathPredicateItem().setPattern("/other/**")
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
}});
apiDefinitions.add(api1);
apiDefinitions.add(api2);
GatewayApiDefinitionManager.loadApiDefinitions(apiDefinitions);
SentinelGatewayFilter filter = new SentinelGatewayFilter();
when(requestPath.value()).thenReturn("/product/123");
Set<String> matchingApis = filter.pickMatchingApiDefinitions(exchange);
assertThat(matchingApis.size()).isEqualTo(1);
assertThat(matchingApis.contains(apiName1)).isTrue();
when(requestPath.value()).thenReturn("/products");
assertThat(filter.pickMatchingApiDefinitions(exchange).size()).isZero();
when(requestPath.value()).thenReturn("/something");
matchingApis = filter.pickMatchingApiDefinitions(exchange);
assertThat(matchingApis.size()).isEqualTo(1);
assertThat(matchingApis.contains(apiName2)).isTrue();
when(requestPath.value()).thenReturn("/other/foo/3");
matchingApis = filter.pickMatchingApiDefinitions(exchange);
assertThat(matchingApis.size()).isEqualTo(1);
assertThat(matchingApis.contains(apiName2)).isTrue();
}
@Before
public void setUp() {
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
GatewayRuleManager.loadRules(new HashSet<>());
}
@After
public void tearDown() {
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
GatewayRuleManager.loadRules(new HashSet<>());
}
}

View File

@ -0,0 +1,209 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.gateway.sc;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager;
import com.alibaba.csp.sentinel.adapter.gateway.common.param.GatewayParamParser;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayParamFlowItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.RequestPath;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author Eric Zhao
*/
public class SpringCloudGatewayParamParserTest {
private final GatewayParamParser<ServerWebExchange> paramParser = new GatewayParamParser<>(
new ServerWebExchangeItemParser()
);
@Test
public void testParseParametersNoParamItem() {
// Mock a request.
ServerWebExchange exchange = mock(ServerWebExchange.class);
// Prepare gateway rules.
Set<GatewayFlowRule> rules = new HashSet<>();
String routeId1 = "my_test_route_A";
rules.add(new GatewayFlowRule(routeId1)
.setCount(5)
.setIntervalSec(1)
);
GatewayRuleManager.loadRules(rules);
Object[] params = paramParser.parseParameterFor(routeId1, exchange,
e -> e.getResourceMode() == 0);
assertThat(params.length).isEqualTo(1);
}
@Test
public void testParseParametersWithItems() {
// Mock a request.
ServerWebExchange exchange = mock(ServerWebExchange.class);
ServerHttpRequest request = mock(ServerHttpRequest.class);
when(exchange.getRequest()).thenReturn(request);
RequestPath requestPath = mock(RequestPath.class);
when(request.getPath()).thenReturn(requestPath);
// Prepare gateway rules.
Set<GatewayFlowRule> rules = new HashSet<>();
String routeId1 = "my_test_route_A";
String api1 = "my_test_route_B";
String headerName = "X-Sentinel-Flag";
String paramName = "p";
GatewayFlowRule routeRule1 = new GatewayFlowRule(routeId1)
.setCount(2)
.setIntervalSec(2)
.setBurst(2)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_CLIENT_IP)
);
GatewayFlowRule routeRule2 = new GatewayFlowRule(routeId1)
.setCount(10)
.setIntervalSec(1)
.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)
.setMaxQueueingTimeoutMs(600)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HEADER)
.setFieldName(headerName)
);
GatewayFlowRule routeRule3 = new GatewayFlowRule(routeId1)
.setCount(20)
.setIntervalSec(1)
.setBurst(5)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)
.setFieldName(paramName)
);
GatewayFlowRule routeRule4 = new GatewayFlowRule(routeId1)
.setCount(120)
.setIntervalSec(10)
.setBurst(30)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HOST)
);
GatewayFlowRule apiRule1 = new GatewayFlowRule(api1)
.setResourceMode(SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME)
.setCount(5)
.setIntervalSec(1)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)
.setFieldName(paramName)
);
rules.add(routeRule1);
rules.add(routeRule2);
rules.add(routeRule3);
rules.add(routeRule4);
rules.add(apiRule1);
GatewayRuleManager.loadRules(rules);
String expectedHost = "hello.test.sentinel";
String expectedAddress = "66.77.88.99";
String expectedHeaderValue1 = "Sentinel";
String expectedUrlParamValue1 = "17";
mockClientHostAddress(request, expectedAddress);
Map<String, String> expectedHeaders = new HashMap<String, String>() {{
put(headerName, expectedHeaderValue1); put("Host", expectedHost);
}};
mockHeaders(request, expectedHeaders);
mockSingleUrlParam(request, paramName, expectedUrlParamValue1);
Object[] params = paramParser.parseParameterFor(routeId1, exchange, e -> e.getResourceMode() == 0);
assertThat(params.length).isEqualTo(4);
assertThat(params[routeRule1.getParamItem().getIndex()]).isEqualTo(expectedAddress);
assertThat(params[routeRule2.getParamItem().getIndex()]).isEqualTo(expectedHeaderValue1);
assertThat(params[routeRule3.getParamItem().getIndex()]).isEqualTo(expectedUrlParamValue1);
assertThat(params[routeRule4.getParamItem().getIndex()]).isEqualTo(expectedHost);
assertThat(paramParser.parseParameterFor(api1, exchange, e -> e.getResourceMode() == 0).length).isZero();
String expectedUrlParamValue2 = "fs";
mockSingleUrlParam(request, paramName, expectedUrlParamValue2);
params = paramParser.parseParameterFor(api1, exchange, e -> e.getResourceMode() == 1);
assertThat(params.length).isEqualTo(1);
assertThat(params[apiRule1.getParamItem().getIndex()]).isEqualTo(expectedUrlParamValue2);
}
private void mockClientHostAddress(/*@Mock*/ ServerHttpRequest request, String address) {
InetSocketAddress socketAddress = mock(InetSocketAddress.class);
when(request.getRemoteAddress()).thenReturn(socketAddress);
InetAddress inetAddress = mock(InetAddress.class);
when(inetAddress.getHostAddress()).thenReturn(address);
when(socketAddress.getAddress()).thenReturn(inetAddress);
}
private void mockHeaders(/*@Mock*/ ServerHttpRequest request, Map<String, String> headerMap) {
HttpHeaders headers = mock(HttpHeaders.class);
when(request.getHeaders()).thenReturn(headers);
for (Map.Entry<String, String> e : headerMap.entrySet()) {
when(headers.getFirst(e.getKey())).thenReturn(e.getValue());
}
}
@SuppressWarnings("unchecked")
private void mockUrlParams(/*@Mock*/ ServerHttpRequest request, Map<String, String> paramMap) {
MultiValueMap<String, String> urlParams = mock(MultiValueMap.class);
when(request.getQueryParams()).thenReturn(urlParams);
for (Map.Entry<String, String> e : paramMap.entrySet()) {
when(urlParams.getFirst(e.getKey())).thenReturn(e.getValue());
}
}
@SuppressWarnings("unchecked")
private void mockSingleUrlParam(/*@Mock*/ ServerHttpRequest request, String key, String value) {
MultiValueMap<String, String> urlParams = mock(MultiValueMap.class);
when(request.getQueryParams()).thenReturn(urlParams);
when(urlParams.getFirst(key)).thenReturn(value);
}
private void mockSingleHeader(/*@Mock*/ ServerHttpRequest request, String key, String value) {
HttpHeaders headers = mock(HttpHeaders.class);
when(request.getHeaders()).thenReturn(headers);
when(headers.getFirst(key)).thenReturn(value);
}
@Before
public void setUp() {
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
GatewayRuleManager.loadRules(new HashSet<>());
}
@After
public void tearDown() {
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
GatewayRuleManager.loadRules(new HashSet<>());
}
}

View File

@ -17,7 +17,7 @@
<spring.boot.version>3.0.0</spring.boot.version>
<servlet.api.version>6.0.0</servlet.api.version>
<jakarta.xml.bind-api.version>4.0.0</jakarta.xml.bind-api.version>
<slf4j-api.version>2.0.4</slf4j-api.version>
<slf4j-api.version>2.0.16</slf4j-api.version>
<skip.spring.v6x.test>false</skip.spring.v6x.test>
</properties>

View File

@ -16,7 +16,13 @@
package com.alibaba.csp.sentinel.cluster.client;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
@ -35,6 +41,7 @@ import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
/**
* Default implementation of {@link ClusterTokenClient}.
@ -43,11 +50,30 @@ import com.alibaba.csp.sentinel.util.StringUtil;
* @since 1.4.0
*/
public class DefaultClusterTokenClient implements ClusterTokenClient {
public class CachedTokenData {
AtomicInteger count;
AtomicInteger lastStatus;
AtomicLong lastWaitUntilMs;
AtomicInteger lastWaitPrefetchCnt;
AtomicInteger lastRemaining;
public CachedTokenData() {
count = new AtomicInteger(0);
lastStatus = new AtomicInteger(TokenResultStatus.OK);
lastWaitUntilMs = new AtomicLong(0);
lastWaitPrefetchCnt = new AtomicInteger(0);
lastRemaining = new AtomicInteger(0);
}
}
private ClusterTransportClient transportClient;
private TokenServerDescriptor serverDescriptor;
private final AtomicBoolean shouldStart = new AtomicBoolean(false);
private int checkInterval = 2;
ConcurrentHashMap<Long, CachedTokenData> localPrefetchedTokens = new ConcurrentHashMap<>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private final ScheduledExecutorService prefetchScheduler = Executors.newScheduledThreadPool(2,
new NamedThreadFactory("sentinel-cluster-prefetch-scheduler", true));
public DefaultClusterTokenClient() {
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
@ -146,6 +172,141 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
return serverDescriptor;
}
public void setInterval(int val) {
checkInterval = val;
}
public void resetCache() {
localPrefetchedTokens.clear();
}
public int currentRuleCached(Long flowId) {
CachedTokenData d = localPrefetchedTokens.get(flowId);
if (d == null) {
return 0;
}
return d.count.get();
}
private void preFetch(Long flowId, CachedTokenData value, int prefetchCnt) {
long waitUntil = value.lastWaitUntilMs.get();
if (waitUntil > 0 && System.currentTimeMillis() < waitUntil) {
return;
}
if (waitUntil > 0) {
value.count.addAndGet(value.lastWaitPrefetchCnt.get());
value.lastStatus.set(TokenResultStatus.OK);
value.lastWaitUntilMs.set(0);
value.lastWaitPrefetchCnt.set(0);
}
int current = value.count.get();
if (current >= prefetchCnt / 2) {
return;
}
if (current < -1 * prefetchCnt) {
// avoid too much prefetch
current = -1 * prefetchCnt;
}
prefetchCnt = prefetchCnt - current;
TokenResult fetched = requestToken(flowId, prefetchCnt, true);
value.lastWaitUntilMs.set(0);
value.lastStatus.set(fetched.getStatus());
value.lastRemaining.set(fetched.getRemaining());
if (fetched.getStatus() == TokenResultStatus.OK) {
value.count.addAndGet(prefetchCnt);
} else if (fetched.getStatus() == TokenResultStatus.SHOULD_WAIT) {
value.lastWaitUntilMs.set(System.currentTimeMillis() + fetched.getWaitInMs());
value.lastWaitPrefetchCnt.set(prefetchCnt);
}
}
private TokenResult tryLocalCachedToken(CachedTokenData data, int acquireCount, int prefetchCnt) {
int count = data.count.get();
TokenResult ret = new TokenResult(data.lastStatus.get());
ret.setFromCached(true);
ret.setRemaining(data.lastRemaining.get());
if (count >= acquireCount) {
// here we allow the concurrency which may cause decrease to negative count, it
// is just skipped some requests
// and it will be refilled by the bg prefetch in next round.
data.count.addAndGet(-1 * acquireCount);
ret.setStatus(TokenResultStatus.OK);
return ret;
}
if (acquireCount > prefetchCnt) {
return null;
}
if (ret.getStatus() == TokenResultStatus.SHOULD_WAIT) {
int newN = data.count.addAndGet(-1 * acquireCount);
if (newN + data.lastWaitPrefetchCnt.get() < -1 * prefetchCnt) {
data.count.addAndGet(acquireCount);
if (acquireCount <= prefetchCnt / 2) {
// since last status is still waiting, we should not block directly, make it failover to local
ret.setStatus(TokenResultStatus.FAIL);
return ret;
}
// for the large acquireCount, we can try remote again, since large request will
// much slower which will have less pressure to remote
return null;
}
int waitMs = (int) (data.lastWaitUntilMs.get() - System.currentTimeMillis());
if (waitMs > 0) {
ret.setWaitInMs(waitMs);
}
return ret;
} else if (ret.getStatus() == TokenResultStatus.OK) {
// last ok, but the cached count is not enough, we can preuse it to avoid remote
// request too often,
// otherwise just try remote request
int newN = data.count.addAndGet(-1 * acquireCount);
if (newN < -1 * prefetchCnt * 2) {
// preuse failed since not enough, added it back
data.count.addAndGet(acquireCount);
if (acquireCount <= prefetchCnt / 2) {
// since last is still ok, we should not block directly, make it failover to local
ret.setStatus(TokenResultStatus.FAIL);
return ret;
}
// for the large acquireCount, we can try remote again, since large request will much slower which will have less pressure to remote
return null;
}
// preuse ok
return ret;
} else {
// should fail directly
return ret;
}
}
@Override
public TokenResult requestTokenWithCache(Long flowId, int acquireCount, int prefetchCnt) {
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
// try local prefetched first
CachedTokenData data = localPrefetchedTokens.get(flowId);
if (data != null) {
TokenResult ret = tryLocalCachedToken(data, acquireCount, prefetchCnt);
if (ret != null) {
return ret;
}
} else {
localPrefetchedTokens.computeIfAbsent(flowId, k -> {
CachedTokenData v = new CachedTokenData();
prefetchScheduler.scheduleAtFixedRate(() -> {
try {
preFetch(flowId, v, prefetchCnt);
} catch (Throwable e) {
RecordLog.info("[DefaultClusterTokenClient] prefetch failed for flowId {}", flowId, e);
}
}, 0, checkInterval, TimeUnit.MILLISECONDS);
return v;
});
}
// fallback to remote request
return requestToken(flowId, acquireCount, true);
}
@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
if (notValidRequest(flowId, acquireCount)) {

View File

@ -0,0 +1,432 @@
package com.alibaba.csp.sentinel.cluster.client;
import org.junit.Test;
import org.mockito.Spy;
import org.junit.runner.RunWith;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.runners.MethodSorters;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.doReturn;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import org.mockito.junit.MockitoJUnitRunner;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
@RunWith(MockitoJUnitRunner.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class DefaultClusterTokenClientTest {
@Spy
DefaultClusterTokenClient client = new DefaultClusterTokenClient();
final int testInterval = 60;
@Test
public void testClientCacheWithRemoteOK() throws Exception {
client.setInterval(testInterval);
client.resetCache();
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
int prefetch = 10;
TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch);
// first should remote
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Thread.sleep(testInterval);
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
for (int i = 0;i < prefetch * 3; i++) {
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
}
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
ret = client.requestTokenWithCache(1L, cnt, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
}
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Thread.sleep(testInterval * 2);
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
// should refill prefetch * 2 in once to make sure we have at least prefetch count in cache
ret = client.requestTokenWithCache(1L, prefetch, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Assert.assertEquals(0, client.currentRuleCached(1L));
Thread.sleep(testInterval);
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
ret = client.requestTokenWithCache(1L, prefetch / 2, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Assert.assertEquals(prefetch / 2, client.currentRuleCached(1L));
Thread.sleep(testInterval);
Assert.assertEquals(prefetch / 2, client.currentRuleCached(1L));
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
// use less than half will not refill, so cache is not enough
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Assert.assertEquals(prefetch / 2 - 1, client.currentRuleCached(1L));
// refill at least prefetch at once, so we can get at most 1.5 * prefetch in cache
Thread.sleep(testInterval);
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
ret = client.requestTokenWithCache(1L, prefetch, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Assert.assertEquals(0, client.currentRuleCached(1L));
}
@Test
public void testClientCacheWithRemoteWait() throws Exception {
client.setInterval(testInterval);
client.resetCache();
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
int prefetch = 10;
TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch);
// first should remote
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Thread.sleep(testInterval);
TokenResult waitResult = new TokenResult(TokenResultStatus.SHOULD_WAIT);
waitResult.setWaitInMs(testInterval * 4);
doReturn(waitResult).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
for (int i = 0;i < prefetch * 3; i++) {
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
}
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
ret = client.requestTokenWithCache(1L, cnt, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
}
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
Thread.sleep(testInterval * 2);
// prefetch count will be 2 * prefetch, and last status became should wait
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
// refill will be waited until the timeout
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 3 <= ret.getWaitInMs());
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval > ret.getWaitInMs());
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 3 <= ret.getWaitInMs());
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval > ret.getWaitInMs());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
Thread.sleep(testInterval);
Assert.assertEquals(-1 * prefetch * 2 - 1 - prefetch / 2 - 1, client.currentRuleCached(1L));
// wait the timeout
Thread.sleep(waitResult.getWaitInMs());
// the prefetch count should be added to the count
Assert.assertEquals(- 1 - prefetch / 2 - 1, client.currentRuleCached(1L));
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
Thread.sleep(waitResult.getWaitInMs());
Thread.sleep(testInterval);
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
doReturn(waitResult).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
ret = client.requestTokenWithCache(1L, prefetch - 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Assert.assertEquals(1, client.currentRuleCached(1L));
Thread.sleep(testInterval);
// refill will be waiting and the last state became waiting, but local has some tokens
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Assert.assertEquals(0, client.currentRuleCached(1L));
ret = client.requestTokenWithCache(1L, prefetch, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 2 <= ret.getWaitInMs());
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() > ret.getWaitInMs());
ret = client.requestTokenWithCache(1L, prefetch - 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
Assert.assertEquals(-1 * prefetch * 2 + 1, client.currentRuleCached(1L));
Thread.sleep(testInterval);
Assert.assertEquals(-1 * prefetch * 2 + 1, client.currentRuleCached(1L));
// refill will be waiting
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
Thread.sleep(waitResult.getWaitInMs() + testInterval);
// the prefetch count should be added to the count
Assert.assertEquals(-1 * prefetch, client.currentRuleCached(1L));
Thread.sleep(waitResult.getWaitInMs() + testInterval);
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
}
@Test
public void testClientCacheWithRemoteBlocked() throws Exception {
client.setInterval(testInterval);
client.resetCache();
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
int prefetch = 10;
TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch);
// first should remote
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Thread.sleep(testInterval);
// begin test while remote refused
doReturn(new TokenResult(TokenResultStatus.BLOCKED)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
for (int i = 0;i < prefetch * 3; i++) {
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
}
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
ret = client.requestTokenWithCache(1L, cnt, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
}
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
Thread.sleep(testInterval);
// refill will be blocked
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
Thread.sleep(testInterval * 2);
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
doReturn(new TokenResult(TokenResultStatus.BLOCKED)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
for (int i = 0;i < prefetch / 2 + 1; i++) {
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
}
Thread.sleep(testInterval);
Assert.assertEquals(prefetch - prefetch / 2 - 1, client.currentRuleCached(1L));
// refill will be blocked and the last state became blocked, but local has some tokens
ret = client.requestTokenWithCache(1L, prefetch - prefetch / 2 - 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
Assert.assertEquals(0, client.currentRuleCached(1L));
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
ret = client.requestTokenWithCache(1L, cnt, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
}
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
Thread.sleep(testInterval);
// refill will be blocked
ret = client.requestTokenWithCache(1L, 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
Assert.assertTrue(ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
Assert.assertEquals(0, ret.getWaitInMs());
}
@Test
public void testConcurrencyRequestClientCache() throws Exception {
client.setInterval(1);
client.resetCache();
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
int prefetch = 200;
ScheduledExecutorService testScheduler = Executors.newScheduledThreadPool(16,
new NamedThreadFactory("test-scheduler", true));
AtomicInteger blocked = new AtomicInteger();
AtomicInteger failed = new AtomicInteger();
AtomicInteger ok = new AtomicInteger();
AtomicInteger cached = new AtomicInteger();
AtomicInteger notCached = new AtomicInteger();
AtomicBoolean stopped = new AtomicBoolean(false);
for (int concurrency = 0; concurrency < 8; concurrency++) {
testScheduler.submit(() -> {
System.out.println("running begin");
for (int loop = 0; loop < 200; loop++) {
for (int cnt = 1; cnt < prefetch * 2; cnt++) {
TokenResult ret = client.requestTokenWithCache(1L, cnt, prefetch);
if (cnt > prefetch * 1.5) {
Assert.assertTrue(!ret.isFromCached());
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
notCached.incrementAndGet();
} else {
if (ret.getStatus() == TokenResultStatus.BLOCKED) {
Assert.assertTrue(ret.isFromCached());
blocked.incrementAndGet();
cached.incrementAndGet();
} else if (ret.getStatus() == TokenResultStatus.FAIL) {
Assert.assertTrue(ret.isFromCached());
failed.incrementAndGet();
cached.incrementAndGet();
} else {
ok.incrementAndGet();
if (ret.isFromCached()) {
cached.incrementAndGet();
} else {
notCached.incrementAndGet();
}
}
}
Assert.assertEquals(0, ret.getWaitInMs());
if (cnt % 50 == 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
}
if (stopped.get()) {
break;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {}
}
System.out.println("running done");
});
}
testScheduler.submit(() -> {
for (; !stopped.get() ;) {
System.out.println("current rule cached: " + client.currentRuleCached(1L));
System.out.println("current failed: " + failed.get() + ", passed: " + ok.get() + ", cached: " + cached.get() + ", not cached: " + notCached.get());
try {
Thread.sleep(3);
} catch (InterruptedException e) {
}
}
});
Thread.sleep(2000);
stopped.set(true);
testScheduler.shutdown();
testScheduler.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("current rule cached: " + client.currentRuleCached(1L));
System.out.println("current failed: " + failed.get() + ", passed: " + ok.get() + ", cached: " + cached.get()
+ ", not cached: " + notCached.get());
Assert.assertTrue(blocked.get() + failed.get() < cached.get());
Assert.assertTrue(cached.get() + notCached.get() > blocked.get() + ok.get() + failed.get());
Assert.assertTrue(failed.get() > 0);
Assert.assertTrue(ok.get() > 0);
Assert.assertTrue(client.currentRuleCached(1L) >= prefetch / 2);
}
}

View File

@ -37,13 +37,14 @@ final class ClusterFlowChecker {
private static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
double preCnt = rule.getClusterConfig().getPrefetchCntRatio() * count;
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
return count + preCnt;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
return count * connectedCount;
return count * connectedCount + preCnt;
}
}

View File

@ -35,6 +35,11 @@ import java.util.Collection;
@Spi(isDefault = true)
public class DefaultTokenService implements TokenService {
@Override
public TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt) {
return requestToken(ruleId, acquireCount, true);
}
@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
if (notValidRequest(ruleId, acquireCount)) {

View File

@ -43,6 +43,11 @@ public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer {
server.stop();
}
@Override
public TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt) {
return requestToken(ruleId, acquireCount, true);
}
@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
if (tokenService != null) {

View File

@ -34,6 +34,8 @@ public class TokenResult {
private Map<String, String> attachments;
private boolean isFromCached;
public TokenResult() {
}
@ -85,6 +87,15 @@ public class TokenResult {
return this;
}
public boolean isFromCached() {
return isFromCached;
}
public TokenResult setFromCached(boolean fromCached) {
isFromCached = fromCached;
return this;
}
@Override
public String toString() {
return "TokenResult{" +
@ -93,6 +104,7 @@ public class TokenResult {
", waitInMs=" + waitInMs +
", attachments=" + attachments +
", tokenId=" + tokenId +
", isFromCached=" + isFromCached +
'}';
}
}

View File

@ -35,6 +35,15 @@ public interface TokenService {
*/
TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized);
/**
* Request tokens from cache and remote token server.
*
* @param ruleId the unique rule ID
* @param acquireCount token count to acquire
* @return result of the token request
*/
TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt);
/**
* Request tokens for a specific parameter from remote token server.
*

View File

@ -118,6 +118,12 @@ public class DefaultNode extends StatisticNode {
this.clusterNode.increaseExceptionQps(count);
}
@Override
public void increaseFallbackQps(int count) {
super.increaseFallbackQps(count);
this.clusterNode.increaseFallbackQps(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
super.addRtAndSuccess(rt, successCount);

View File

@ -67,6 +67,8 @@ public interface Node extends OccupySupport, DebugSupport {
* @return total business exception count per minute
*/
long totalException();
long totalFallback();
/**
* Get pass request per second.
@ -109,6 +111,8 @@ public interface Node extends OccupySupport, DebugSupport {
* @return QPS of exception occurs
*/
double exceptionQps();
double fallbackQps();
/**
* Get average rt per second.
@ -186,6 +190,8 @@ public interface Node extends OccupySupport, DebugSupport {
*/
void increaseExceptionQps(int count);
void increaseFallbackQps(int count);
/**
* Increase current thread count.
*/

View File

@ -149,6 +149,7 @@ public class StatisticNode implements Node {
@Override
public void reset() {
rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
}
@Override
@ -196,6 +197,16 @@ public class StatisticNode implements Node {
return rollingCounterInMinute.exception();
}
@Override
public double fallbackQps() {
return rollingCounterInSecond.fallback() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public long totalFallback() {
return rollingCounterInMinute.fallback();
}
@Override
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
@ -269,6 +280,12 @@ public class StatisticNode implements Node {
rollingCounterInMinute.addException(count);
}
@Override
public void increaseFallbackQps(int count) {
rollingCounterInSecond.addFallback(count);
rollingCounterInMinute.addFallback(count);
}
@Override
public void increaseThreadNum() {
curThreadNum.increment();

View File

@ -15,9 +15,10 @@
*/
package com.alibaba.csp.sentinel.node.metric;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* Metrics data for a specific resource at given {@code timestamp}.
@ -210,12 +211,19 @@ public class MetricNode {
*
* @return string format of this.
*/
private static final DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public String toFatString() {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
StringBuilder sb = new StringBuilder(32);
sb.delete(0, sb.length());
sb.append(getTimestamp()).append("|");
sb.append(df.format(new Date(getTimestamp()))).append("|");
long timestamp = getTimestamp();
sb.append(timestamp).append("|");
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
sb.append(df.format(dateTime)).append("|");
String legalName = getResource().replaceAll("\\|", "_");
sb.append(legalName).append("|");
sb.append(getPassQps()).append("|");

View File

@ -73,6 +73,27 @@ public class ClusterFlowConfig {
*/
private long clientOfflineTime = 2000;
// prefetch ratio if used to calculate prefetch count (ratio * count) for each request, prefetch can be used
// to reduce the number of requests to the server and reduce the latency for the request.
// 0: disable prefetch.
// value should be in the range [0.001 ~ 0.1]
// We suggest disable prefetch if the limit count is less than 100,
// for count <100, suggest ratio 0.1
// for count 100~1000, suggest ratio 0.06
// for count 1000~10000, suggest ratio 0.04
// for count 10000~100000, suggest ratio 0.01
// for count 100000~1M, suggest ratio 0.01~0.002
// note the prefetch count should less than 50% of the avg count of each client (cluster total count / nums of clients).
private double prefetchCntRatio = 0;
public double getPrefetchCntRatio() {
return prefetchCntRatio;
}
public void setPrefetchCntRatio(double prefetchCntRatio) {
this.prefetchCntRatio = prefetchCntRatio;
}
public long getResourceTimeout() {
return resourceTimeout;
}
@ -197,6 +218,9 @@ public class ClusterFlowConfig {
if (acquireRefuseStrategy != that.acquireRefuseStrategy) {
return false;
}
if (Double.compare(prefetchCntRatio, that.prefetchCntRatio) != 0) {
return false;
}
return Objects.equals(flowId, that.flowId);
}
@ -212,6 +236,7 @@ public class ClusterFlowConfig {
result = (int) (31 * result + clientOfflineTime);
result = 31 * result + resourceTimeoutStrategy;
result = 31 * result + acquireRefuseStrategy;
result = (int) (31 * result + prefetchCntRatio);
return result;
}
@ -228,6 +253,7 @@ public class ClusterFlowConfig {
", resourceTimeoutStrategy=" + resourceTimeoutStrategy +
", acquireRefuseStrategy=" + acquireRefuseStrategy +
", clientOfflineTime=" + clientOfflineTime +
", prefetchCntRatio=" + prefetchCntRatio +
'}';
}
}

View File

@ -152,8 +152,15 @@ public class FlowRuleChecker {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
double ratio = rule.getClusterConfig().getPrefetchCntRatio();
int prefetchCnt = (int)(ratio * rule.getCount());
if (prefetchCnt > 1) {
TokenResult result = clusterService.requestTokenWithCache(flowId, acquireCount, prefetchCnt);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
} else {
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
}
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
@ -166,6 +173,7 @@ public class FlowRuleChecker {
private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
node.increaseFallbackQps(acquireCount);
return passLocalCheck(rule, context, node, acquireCount, prioritized);
} else {
// The rule won't be activated, just pass.
@ -192,7 +200,11 @@ public class FlowRuleChecker {
case TokenResultStatus.SHOULD_WAIT:
// Wait for next tick.
try {
Thread.sleep(result.getWaitInMs());
int waitMs = result.getWaitInMs();
if (waitMs > rule.getMaxQueueingTimeMs()) {
waitMs = rule.getMaxQueueingTimeMs();
}
Thread.sleep(waitMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
@ -207,4 +219,4 @@ public class FlowRuleChecker {
return false;
}
}
}
}

View File

@ -35,5 +35,7 @@ public enum MetricEvent {
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
OCCUPIED_PASS,
// fallback pass in cluster mode
FALLBACK
}

View File

@ -91,6 +91,10 @@ public class MetricBucket {
return get(MetricEvent.EXCEPTION);
}
public long fallback() {
return get(MetricEvent.FALLBACK);
}
public long rt() {
return get(MetricEvent.RT);
}
@ -115,6 +119,10 @@ public class MetricBucket {
add(MetricEvent.EXCEPTION, n);
}
public void addFallback(int n) {
add(MetricEvent.FALLBACK, n);
}
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}

View File

@ -93,6 +93,17 @@ public class ArrayMetric implements Metric {
return exception;
}
@Override
public long fallback() {
data.currentWindow();
long fallback = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
fallback += window.fallback();
}
return fallback;
}
@Override
public long block() {
data.currentWindow();
@ -216,6 +227,12 @@ public class ArrayMetric implements Metric {
wrap.value().addException(count);
}
@Override
public void addFallback(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addFallback(count);
}
@Override
public void addBlock(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();

View File

@ -50,6 +50,13 @@ public interface Metric extends DebugSupport {
*/
long exception();
/**
* Get total fallback count.
*
* @return fallback count
*/
long fallback();
/**
* Get total block count.
*
@ -108,6 +115,13 @@ public interface Metric extends DebugSupport {
*/
void addException(int n);
/**
* Add current fallback count.
*
* @param n count to add
*/
void addFallback(int n);
/**
* Add current block count.
*

View File

@ -156,7 +156,7 @@ public class EagleEyeCoreUtilsTest {
@Test
public void testFormatTime() {
Assert.assertEquals("2019-06-15 12:13:14.000",
EagleEyeCoreUtils.formatTime(1560600794000L - TimeZone.getDefault().getRawOffset()));
EagleEyeCoreUtils.formatTime(1560600794000L - TimeZone.getDefault().getOffset(1560600794000L)));
}
@Test

View File

@ -66,4 +66,4 @@ public class SentinelAspectConfiguration {
}
```
An example for using Sentinel Annotation AspectJ with Spring Boot can be found [here](https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-annotation-spring-aop).
An example for using Sentinel Annotation AspectJ with Spring Boot can be found in [sentinel-demo/sentinel-demo-annotation-spring-aop](https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-annotation-spring-aop).

View File

@ -37,11 +37,6 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

View File

@ -39,9 +39,11 @@ public class NodeVo {
private Long averageRt;
private Long successQps;
private Long exceptionQps;
private Long fallbackQps;
private Long oneMinutePass;
private Long oneMinuteBlock;
private Long oneMinuteException;
private Long oneMinuteFallback;
private Long oneMinuteTotal;
private Long timestamp;
@ -69,7 +71,9 @@ public class NodeVo {
vo.averageRt = (long) node.avgRt();
vo.successQps = (long) node.successQps();
vo.exceptionQps = (long) node.exceptionQps();
vo.fallbackQps = (long) node.fallbackQps();
vo.oneMinuteException = node.totalException();
vo.oneMinuteFallback = node.totalFallback();
vo.oneMinutePass = node.totalRequest() - node.blockRequest();
vo.oneMinuteBlock = node.blockRequest();
vo.oneMinuteTotal = node.totalRequest();
@ -108,7 +112,9 @@ public class NodeVo {
vo.averageRt = (long) node.avgRt();
vo.successQps = (long) node.successQps();
vo.exceptionQps = (long) node.exceptionQps();
vo.fallbackQps = (long) node.fallbackQps();
vo.oneMinuteException = node.totalException();
vo.oneMinuteFallback = node.totalFallback();
vo.oneMinutePass = node.totalRequest() - node.blockRequest();
vo.oneMinuteBlock = node.blockRequest();
vo.oneMinuteTotal = node.totalRequest();
@ -204,6 +210,22 @@ public class NodeVo {
this.oneMinuteException = oneMinuteException;
}
public Long getFallbackQps() {
return fallbackQps;
}
public void setFallbackQps(Long fallbackQps) {
this.fallbackQps = fallbackQps;
}
public Long getOneMinuteFallback() {
return oneMinuteFallback;
}
public void setOneMinuteFallback(Long oneMinuteFallback) {
this.oneMinuteFallback = oneMinuteFallback;
}
public Long getOneMinutePass() {
return oneMinutePass;
}