mirror of https://github.com/alibaba/Sentinel.git
Compare commits
12 Commits
6a0bdc6852
...
0ea2d37ff6
| Author | SHA1 | Date |
|---|---|---|
|
|
0ea2d37ff6 | |
|
|
4a419818af | |
|
|
d9398b4f75 | |
|
|
4cad353dda | |
|
|
ca596457b0 | |
|
|
9fa818cbc9 | |
|
|
5473ad7b3e | |
|
|
493f90d6ba | |
|
|
dba80f8719 | |
|
|
de16134388 | |
|
|
6a149f7091 | |
|
|
109c3b4c6d |
|
|
@ -27,6 +27,7 @@
|
|||
<module>sentinel-spring-webflux-adapter</module>
|
||||
<module>sentinel-api-gateway-adapter-common</module>
|
||||
<module>sentinel-spring-cloud-gateway-adapter</module>
|
||||
<module>sentinel-spring-cloud-gateway-v6x-adapter</module>
|
||||
<module>sentinel-web-adapter-common</module>
|
||||
<module>sentinel-spring-webmvc-adapter</module>
|
||||
<module>sentinel-spring-webmvc-v6x-adapter</module>
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ If you don't want the filters enabled, you can manually disable them. For exampl
|
|||
<dubbo:provider filter="-sentinel.dubbo.provider.filter"/>
|
||||
```
|
||||
|
||||
For more details of Dubbo filter, see [here](http://dubbo.apache.org/en-us/docs/dev/impls/filter.html).
|
||||
For more details of Dubbo filter, see [Dubbo filter documentation](https://cn.dubbo.apache.org/en/overview/mannual/java-sdk/tasks/extensibility/filter/).
|
||||
|
||||
## Dubbo resources
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ If you don't want the filters enabled, you can manually disable them. For exampl
|
|||
<dubbo:provider filter="-sentinel.dubbo.provider.filter"/>
|
||||
```
|
||||
|
||||
For more details of Dubbo filter, see [here](https://dubbo.apache.org/zh/docs3-v2/java-sdk/reference-manual/spi/description/filter/).
|
||||
For more details of Dubbo filter, see [Dubbo filter documentation](https://cn.dubbo.apache.org/en/overview/mannual/java-sdk/tasks/extensibility/filter/).
|
||||
|
||||
## Dubbo resources
|
||||
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ If you don't want the filters enabled, you can manually disable them. For exampl
|
|||
<dubbo:provider filter="-sentinel.dubbo.provider.filter"/>
|
||||
```
|
||||
|
||||
For more details of Dubbo filter, see [here](http://dubbo.apache.org/en-us/docs/dev/impls/filter.html).
|
||||
For more details of Dubbo filter, see [Dubbo filter documentation](https://cn.dubbo.apache.org/en/overview/mannual/java-sdk/tasks/extensibility/filter/).
|
||||
|
||||
## Dubbo resources
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ or add setting in `rpc-config.json` file, and its priority is lower than above.
|
|||
}
|
||||
```
|
||||
|
||||
For more details of SOFARPC filter, see [here](https://www.sofastack.tech/projects/sofa-rpc/custom-filter/).
|
||||
For more details of SOFARPC filter, see [SOFARPC filter documentation](https://www.sofastack.tech/projects/sofa-rpc/custom-filter/).
|
||||
|
||||
## SOFARPC resources
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
# Sentinel Spring Cloud Gateway Adapter
|
||||
|
||||
Sentinel provides integration module with Spring Cloud Gateway.
|
||||
The integration module is based on the Sentinel Reactor Adapter.
|
||||
> This module is quite similar to sentinel-spring-cloud-gateway-adapter and The difference is that this module has made some adaptations for webflux 6.x.
|
||||
> The usage is consistent with sentinel-spring-cloud-gateway-adapter, with the difference being that the maven dependency is different.
|
||||
|
||||
Add the following dependency in `pom.xml` (if you are using Maven):
|
||||
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>com.alibaba.csp</groupId>
|
||||
<artifactId>sentinel-spring-cloud-gateway-v6x-adapter</artifactId>
|
||||
<version>x.y.z</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
Then you only need to inject the corresponding `SentinelGatewayFilter` and `SentinelGatewayBlockExceptionHandler` instance
|
||||
in Spring configuration. For example:
|
||||
|
||||
```java
|
||||
@Configuration
|
||||
public class GatewayConfiguration {
|
||||
|
||||
private final List<ViewResolver> viewResolvers;
|
||||
private final ServerCodecConfigurer serverCodecConfigurer;
|
||||
|
||||
public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider,
|
||||
ServerCodecConfigurer serverCodecConfigurer) {
|
||||
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
|
||||
this.serverCodecConfigurer = serverCodecConfigurer;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Order(-1)
|
||||
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
|
||||
// Register the block exception handler for Spring Cloud Gateway.
|
||||
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Order(-1)
|
||||
public GlobalFilter sentinelGatewayFilter() {
|
||||
return new SentinelGatewayFilter();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The gateway adapter will regard all `routeId` (defined in Spring properties) and all customized API definitions
|
||||
(defined in `GatewayApiDefinitionManager` of `sentinel-api-gateway-adapter-common` module) as resources.
|
||||
|
||||
You can register various customized callback in `GatewayCallbackManager`:
|
||||
|
||||
- `setBlockHandler`: register a customized `BlockRequestHandler` to handle the blocked request. The default implementation is `DefaultBlockRequestHandler`, which returns default message like `Blocked by Sentinel: FlowException`.
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>sentinel-adapter</artifactId>
|
||||
<groupId>com.alibaba.csp</groupId>
|
||||
<version>1.8.8</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>sentinel-spring-cloud-gateway-v6x-adapter</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<java.source.version>17</java.source.version>
|
||||
<java.target.version>17</java.target.version>
|
||||
<spring.cloud.gateway.version>4.3.0</spring.cloud.gateway.version>
|
||||
<spring.boot.version>3.5.0</spring.boot.version>
|
||||
<spring.version>6.2.7</spring.version>
|
||||
<skip.spring.v6x.test>false</skip.spring.v6x.test>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.csp</groupId>
|
||||
<artifactId>sentinel-api-gateway-adapter-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.csp</groupId>
|
||||
<artifactId>sentinel-reactor-adapter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-gateway-server</artifactId>
|
||||
<version>${spring.cloud.gateway.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-webflux</artifactId>
|
||||
<version>${spring.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-gateway</artifactId>
|
||||
<version>${spring.cloud.gateway.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
<version>${spring.boot.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<version>${spring.boot.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${maven.surefire.version}</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.maven.surefire</groupId>
|
||||
<artifactId>surefire-junit47</artifactId>
|
||||
<version>3.2.5</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<configuration>
|
||||
<skipTests>${skip.spring.v6x.test}</skipTests>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc;
|
||||
|
||||
import com.alibaba.csp.sentinel.EntryType;
|
||||
import com.alibaba.csp.sentinel.ResourceTypeConstants;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.param.GatewayParamParser;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.param.RequestItemParser;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.sc.api.GatewayApiMatcherManager;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.sc.api.matcher.WebExchangeApiMatcher;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
|
||||
import com.alibaba.csp.sentinel.adapter.reactor.ContextConfig;
|
||||
import com.alibaba.csp.sentinel.adapter.reactor.EntryConfig;
|
||||
import com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorTransformer;
|
||||
import com.alibaba.csp.sentinel.util.AssertUtil;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilter;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
||||
import org.springframework.cloud.gateway.filter.GlobalFilter;
|
||||
import org.springframework.cloud.gateway.route.Route;
|
||||
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter, Ordered {
|
||||
|
||||
private final int order;
|
||||
|
||||
private final GatewayParamParser<ServerWebExchange> paramParser;
|
||||
|
||||
public SentinelGatewayFilter() {
|
||||
this(Ordered.HIGHEST_PRECEDENCE);
|
||||
}
|
||||
|
||||
public SentinelGatewayFilter(int order) {
|
||||
this(order, new ServerWebExchangeItemParser());
|
||||
}
|
||||
|
||||
public SentinelGatewayFilter(RequestItemParser<ServerWebExchange> serverWebExchangeItemParser) {
|
||||
this(Ordered.HIGHEST_PRECEDENCE, serverWebExchangeItemParser);
|
||||
}
|
||||
|
||||
public SentinelGatewayFilter(int order, RequestItemParser<ServerWebExchange> requestItemParser) {
|
||||
AssertUtil.notNull(requestItemParser, "requestItemParser cannot be null");
|
||||
this.order = order;
|
||||
this.paramParser = new GatewayParamParser<>(requestItemParser);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
||||
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
|
||||
|
||||
Mono<Void> asyncResult = chain.filter(exchange);
|
||||
if (route != null) {
|
||||
String routeId = route.getId();
|
||||
Object[] params = paramParser.parseParameterFor(routeId, exchange,
|
||||
r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID);
|
||||
String origin = Optional.ofNullable(GatewayCallbackManager.getRequestOriginParser())
|
||||
.map(f -> f.apply(exchange))
|
||||
.orElse("");
|
||||
asyncResult = asyncResult.transform(
|
||||
new SentinelReactorTransformer<>(new EntryConfig(routeId, ResourceTypeConstants.COMMON_API_GATEWAY,
|
||||
EntryType.IN, 1, params, new ContextConfig(contextName(routeId), origin)))
|
||||
);
|
||||
}
|
||||
|
||||
Set<String> matchingApis = pickMatchingApiDefinitions(exchange);
|
||||
for (String apiName : matchingApis) {
|
||||
Object[] params = paramParser.parseParameterFor(apiName, exchange,
|
||||
r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME);
|
||||
asyncResult = asyncResult.transform(
|
||||
new SentinelReactorTransformer<>(new EntryConfig(apiName, ResourceTypeConstants.COMMON_API_GATEWAY,
|
||||
EntryType.IN, 1, params))
|
||||
);
|
||||
}
|
||||
|
||||
return asyncResult;
|
||||
}
|
||||
|
||||
private String contextName(String route) {
|
||||
return SentinelGatewayConstants.GATEWAY_CONTEXT_ROUTE_PREFIX + route;
|
||||
}
|
||||
|
||||
Set<String> pickMatchingApiDefinitions(ServerWebExchange exchange) {
|
||||
return GatewayApiMatcherManager.getApiMatcherMap().values()
|
||||
.stream()
|
||||
.filter(m -> m.test(exchange))
|
||||
.map(WebExchangeApiMatcher::getApiName)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return order;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.param.RequestItemParser;
|
||||
|
||||
import org.springframework.http.HttpCookie;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class ServerWebExchangeItemParser implements RequestItemParser<ServerWebExchange> {
|
||||
|
||||
@Override
|
||||
public String getPath(ServerWebExchange exchange) {
|
||||
return exchange.getRequest().getPath().value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteAddress(ServerWebExchange exchange) {
|
||||
InetSocketAddress remoteAddress = exchange.getRequest().getRemoteAddress();
|
||||
if (remoteAddress == null) {
|
||||
return null;
|
||||
}
|
||||
return remoteAddress.getAddress().getHostAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHeader(ServerWebExchange exchange, String key) {
|
||||
return exchange.getRequest().getHeaders().getFirst(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUrlParam(ServerWebExchange exchange, String paramName) {
|
||||
return exchange.getRequest().getQueryParams().getFirst(paramName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCookieValue(ServerWebExchange exchange, String cookieName) {
|
||||
return Optional.ofNullable(exchange.getRequest().getCookies().getFirst(cookieName))
|
||||
.map(HttpCookie::getValue)
|
||||
.orElse(null);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.api;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.sc.api.matcher.WebExchangeApiMatcher;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public final class GatewayApiMatcherManager {
|
||||
|
||||
private static volatile Map<String, WebExchangeApiMatcher> API_MATCHER_MAP = new HashMap<>();
|
||||
|
||||
public static Map<String, WebExchangeApiMatcher> getApiMatcherMap() {
|
||||
return Collections.unmodifiableMap(API_MATCHER_MAP);
|
||||
}
|
||||
|
||||
public static Optional<WebExchangeApiMatcher> getMatcher(final String apiName) {
|
||||
return Optional.ofNullable(apiName)
|
||||
.map(e -> API_MATCHER_MAP.get(apiName));
|
||||
}
|
||||
|
||||
public static Set<ApiDefinition> getApiDefinitionSet() {
|
||||
return API_MATCHER_MAP.values()
|
||||
.stream()
|
||||
.map(WebExchangeApiMatcher::getApiDefinition)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
static synchronized void loadApiDefinitions(/*@Valid*/ Set<ApiDefinition> definitions) {
|
||||
Map<String, WebExchangeApiMatcher> apiMatcherMap = new HashMap<>();
|
||||
for (ApiDefinition definition : definitions) {
|
||||
apiMatcherMap.put(definition.getApiName(), new WebExchangeApiMatcher(definition));
|
||||
}
|
||||
|
||||
API_MATCHER_MAP = apiMatcherMap;
|
||||
}
|
||||
|
||||
private GatewayApiMatcherManager() {}
|
||||
}
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.api;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinitionChangeObserver;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class SpringCloudGatewayApiDefinitionChangeObserver implements ApiDefinitionChangeObserver {
|
||||
|
||||
@Override
|
||||
public void onChange(Set<ApiDefinition> apiDefinitions) {
|
||||
GatewayApiMatcherManager.loadApiDefinitions(apiDefinitions);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.api.matcher;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.matcher.AbstractApiMatcher;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.sc.route.RouteMatchers;
|
||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||
import com.alibaba.csp.sentinel.util.function.Predicate;
|
||||
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class WebExchangeApiMatcher extends AbstractApiMatcher<ServerWebExchange> {
|
||||
|
||||
public WebExchangeApiMatcher(ApiDefinition apiDefinition) {
|
||||
super(apiDefinition);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initializeMatchers() {
|
||||
if (apiDefinition.getPredicateItems() != null) {
|
||||
apiDefinition.getPredicateItems().forEach(item ->
|
||||
fromApiPredicate(item).ifPresent(matchers::add));
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<Predicate<ServerWebExchange>> fromApiPredicate(/*@NonNull*/ ApiPredicateItem item) {
|
||||
if (item instanceof ApiPathPredicateItem) {
|
||||
return fromApiPathPredicate((ApiPathPredicateItem)item);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private Optional<Predicate<ServerWebExchange>> fromApiPathPredicate(/*@Valid*/ ApiPathPredicateItem item) {
|
||||
String pattern = item.getPattern();
|
||||
if (StringUtil.isBlank(pattern)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
switch (item.getMatchStrategy()) {
|
||||
case SentinelGatewayConstants.URL_MATCH_STRATEGY_REGEX:
|
||||
return Optional.of(RouteMatchers.regexPath(pattern));
|
||||
case SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX:
|
||||
return Optional.of(RouteMatchers.antPath(pattern));
|
||||
default:
|
||||
return Optional.of(RouteMatchers.exactPath(pattern));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
|
||||
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Reactive handler for the blocked request.
|
||||
*
|
||||
* @author Eric Zhao
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface BlockRequestHandler {
|
||||
|
||||
/**
|
||||
* Handle the blocked request.
|
||||
*
|
||||
* @param exchange server exchange object
|
||||
* @param t block exception
|
||||
* @return server response to return
|
||||
*/
|
||||
Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t);
|
||||
}
|
||||
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.InvalidMediaTypeException;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
|
||||
/**
|
||||
* The default implementation of {@link BlockRequestHandler}.
|
||||
* Compatible with Spring WebFlux v6x and Spring Cloud Gateway.
|
||||
*
|
||||
* @author uuuyuqi
|
||||
*/
|
||||
public class DefaultBlockRequestHandler implements BlockRequestHandler {
|
||||
|
||||
private static final String DEFAULT_BLOCK_MSG_PREFIX = "Blocked by Sentinel: ";
|
||||
|
||||
@Override
|
||||
public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable ex) {
|
||||
if (acceptsHtml(exchange)) {
|
||||
return htmlErrorResponse(ex);
|
||||
}
|
||||
// JSON result by default.
|
||||
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(buildErrorResult(ex));
|
||||
}
|
||||
|
||||
private Mono<ServerResponse> htmlErrorResponse(Throwable ex) {
|
||||
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.bodyValue(DEFAULT_BLOCK_MSG_PREFIX + ex.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
private ErrorResult buildErrorResult(Throwable ex) {
|
||||
return new ErrorResult(HttpStatus.TOO_MANY_REQUESTS.value(),
|
||||
DEFAULT_BLOCK_MSG_PREFIX + ex.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Reference from {@code DefaultErrorWebExceptionHandler} of Spring Boot.
|
||||
*/
|
||||
private boolean acceptsHtml(ServerWebExchange exchange) {
|
||||
try {
|
||||
List<MediaType> acceptedMediaTypes = exchange.getRequest().getHeaders().getAccept();
|
||||
acceptedMediaTypes.remove(MediaType.ALL);
|
||||
MimeTypeUtils.sortBySpecificity(acceptedMediaTypes);
|
||||
return acceptedMediaTypes.stream()
|
||||
.anyMatch(MediaType.TEXT_HTML::isCompatibleWith);
|
||||
} catch (InvalidMediaTypeException ex) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ErrorResult {
|
||||
private final int code;
|
||||
private final String message;
|
||||
|
||||
ErrorResult(int code, String message) {
|
||||
this.code = code;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.alibaba.csp.sentinel.util.AssertUtil;
|
||||
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public final class GatewayCallbackManager {
|
||||
|
||||
private static final Function<ServerWebExchange, String> DEFAULT_ORIGIN_PARSER = (w) -> "";
|
||||
|
||||
/**
|
||||
* BlockRequestHandler: (serverExchange, exception) -> response
|
||||
*/
|
||||
private static volatile BlockRequestHandler blockHandler = new DefaultBlockRequestHandler();
|
||||
/**
|
||||
* RequestOriginParser: (serverExchange) -> origin
|
||||
*/
|
||||
private static volatile Function<ServerWebExchange, String> requestOriginParser = DEFAULT_ORIGIN_PARSER;
|
||||
|
||||
public static /*@NonNull*/ BlockRequestHandler getBlockHandler() {
|
||||
return blockHandler;
|
||||
}
|
||||
|
||||
public static void resetBlockHandler() {
|
||||
GatewayCallbackManager.blockHandler = new DefaultBlockRequestHandler();
|
||||
}
|
||||
|
||||
public static void setBlockHandler(BlockRequestHandler blockHandler) {
|
||||
AssertUtil.notNull(blockHandler, "blockHandler cannot be null");
|
||||
GatewayCallbackManager.blockHandler = blockHandler;
|
||||
}
|
||||
|
||||
public static /*@NonNull*/ Function<ServerWebExchange, String> getRequestOriginParser() {
|
||||
return requestOriginParser;
|
||||
}
|
||||
|
||||
public static void resetRequestOriginParser() {
|
||||
GatewayCallbackManager.requestOriginParser = DEFAULT_ORIGIN_PARSER;
|
||||
}
|
||||
|
||||
public static void setRequestOriginParser(Function<ServerWebExchange, String> requestOriginParser) {
|
||||
AssertUtil.notNull(requestOriginParser, "requestOriginParser cannot be null");
|
||||
GatewayCallbackManager.requestOriginParser = requestOriginParser;
|
||||
}
|
||||
|
||||
private GatewayCallbackManager() {}
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.callback;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import com.alibaba.csp.sentinel.util.AssertUtil;
|
||||
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class RedirectBlockRequestHandler implements BlockRequestHandler {
|
||||
|
||||
private final URI uri;
|
||||
|
||||
public RedirectBlockRequestHandler(String url) {
|
||||
AssertUtil.assertNotBlank(url, "url cannot be blank");
|
||||
this.uri = URI.create(url);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) {
|
||||
return ServerResponse.temporaryRedirect(uri).build();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.exception;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
|
||||
import com.alibaba.csp.sentinel.slots.block.BlockException;
|
||||
import com.alibaba.csp.sentinel.util.function.Supplier;
|
||||
|
||||
import org.springframework.http.codec.HttpMessageWriter;
|
||||
import org.springframework.http.codec.ServerCodecConfigurer;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import org.springframework.web.reactive.result.view.ViewResolver;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebExceptionHandler;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class SentinelGatewayBlockExceptionHandler implements WebExceptionHandler {
|
||||
|
||||
private List<ViewResolver> viewResolvers;
|
||||
private List<HttpMessageWriter<?>> messageWriters;
|
||||
|
||||
public SentinelGatewayBlockExceptionHandler(List<ViewResolver> viewResolvers, ServerCodecConfigurer serverCodecConfigurer) {
|
||||
this.viewResolvers = viewResolvers;
|
||||
this.messageWriters = serverCodecConfigurer.getWriters();
|
||||
}
|
||||
|
||||
private Mono<Void> writeResponse(ServerResponse response, ServerWebExchange exchange) {
|
||||
return response.writeTo(exchange, contextSupplier.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
|
||||
if (exchange.getResponse().isCommitted()) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
// This exception handler only handles rejection by Sentinel.
|
||||
if (!BlockException.isBlockException(ex)) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
return handleBlockedRequest(exchange, ex)
|
||||
.flatMap(response -> writeResponse(response, exchange));
|
||||
}
|
||||
|
||||
private Mono<ServerResponse> handleBlockedRequest(ServerWebExchange exchange, Throwable throwable) {
|
||||
return GatewayCallbackManager.getBlockHandler().handleRequest(exchange, throwable);
|
||||
}
|
||||
|
||||
private final Supplier<ServerResponse.Context> contextSupplier = () -> new ServerResponse.Context() {
|
||||
@Override
|
||||
public List<HttpMessageWriter<?>> messageWriters() {
|
||||
return SentinelGatewayBlockExceptionHandler.this.messageWriters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ViewResolver> viewResolvers() {
|
||||
return SentinelGatewayBlockExceptionHandler.this.viewResolvers;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.route;
|
||||
|
||||
import com.alibaba.csp.sentinel.util.AssertUtil;
|
||||
import com.alibaba.csp.sentinel.util.function.Predicate;
|
||||
|
||||
import org.springframework.util.AntPathMatcher;
|
||||
import org.springframework.util.PathMatcher;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class AntRoutePathMatcher implements Predicate<ServerWebExchange> {
|
||||
|
||||
private final String pattern;
|
||||
|
||||
private final PathMatcher pathMatcher;
|
||||
private final boolean canMatch;
|
||||
|
||||
public AntRoutePathMatcher(String pattern) {
|
||||
AssertUtil.assertNotBlank(pattern, "pattern cannot be blank");
|
||||
this.pattern = pattern;
|
||||
this.pathMatcher = new AntPathMatcher();
|
||||
this.canMatch = pathMatcher.isPattern(pattern);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean test(ServerWebExchange exchange) {
|
||||
String path = exchange.getRequest().getPath().value();
|
||||
if (canMatch) {
|
||||
return pathMatcher.match(pattern, path);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public String getPattern() {
|
||||
return pattern;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.route;
|
||||
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import com.alibaba.csp.sentinel.util.AssertUtil;
|
||||
import com.alibaba.csp.sentinel.util.function.Predicate;
|
||||
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public class RegexRoutePathMatcher implements Predicate<ServerWebExchange> {
|
||||
|
||||
private final String pattern;
|
||||
private final Pattern regex;
|
||||
|
||||
public RegexRoutePathMatcher(String pattern) {
|
||||
AssertUtil.assertNotBlank(pattern, "pattern cannot be blank");
|
||||
this.pattern = pattern;
|
||||
this.regex = Pattern.compile(pattern);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean test(ServerWebExchange exchange) {
|
||||
String path = exchange.getRequest().getPath().value();
|
||||
return regex.matcher(path).matches();
|
||||
}
|
||||
|
||||
public String getPattern() {
|
||||
return pattern;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc.route;
|
||||
|
||||
|
||||
import com.alibaba.csp.sentinel.util.function.Predicate;
|
||||
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.6.0
|
||||
*/
|
||||
public final class RouteMatchers {
|
||||
|
||||
public static Predicate<ServerWebExchange> all() {
|
||||
return exchange -> true;
|
||||
}
|
||||
|
||||
public static Predicate<ServerWebExchange> antPath(String pathPattern) {
|
||||
return new AntRoutePathMatcher(pathPattern);
|
||||
}
|
||||
|
||||
public static Predicate<ServerWebExchange> exactPath(final String path) {
|
||||
return exchange -> exchange.getRequest().getPath().value().equals(path);
|
||||
}
|
||||
|
||||
public static Predicate<ServerWebExchange> regexPath(String pathPattern) {
|
||||
return new RegexRoutePathMatcher(pathPattern);
|
||||
}
|
||||
|
||||
private RouteMatchers() {}
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
com.alibaba.csp.sentinel.adapter.gateway.sc.api.SpringCloudGatewayApiDefinitionChangeObserver
|
||||
|
|
@ -0,0 +1,91 @@
|
|||
package com.alibaba.csp.sentinel.adapter.gateway.sc;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.http.server.RequestPath;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test cases for {@link SentinelGatewayFilter}.
|
||||
*
|
||||
* @author Eric Zhao
|
||||
*/
|
||||
public class SentinelGatewayFilterTest {
|
||||
|
||||
@Test
|
||||
public void testPickMatchingApiDefinitions() {
|
||||
// Mock a request.
|
||||
ServerWebExchange exchange = mock(ServerWebExchange.class);
|
||||
ServerHttpRequest request = mock(ServerHttpRequest.class);
|
||||
when(exchange.getRequest()).thenReturn(request);
|
||||
RequestPath requestPath = mock(RequestPath.class);
|
||||
when(request.getPath()).thenReturn(requestPath);
|
||||
|
||||
// Prepare API definitions.
|
||||
Set<ApiDefinition> apiDefinitions = new HashSet<>();
|
||||
String apiName1 = "some_customized_api";
|
||||
ApiDefinition api1 = new ApiDefinition(apiName1)
|
||||
.setPredicateItems(Collections.singleton(
|
||||
new ApiPathPredicateItem().setPattern("/product/**")
|
||||
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX)
|
||||
));
|
||||
String apiName2 = "another_customized_api";
|
||||
ApiDefinition api2 = new ApiDefinition(apiName2)
|
||||
.setPredicateItems(new HashSet<ApiPredicateItem>() {{
|
||||
add(new ApiPathPredicateItem().setPattern("/something"));
|
||||
add(new ApiPathPredicateItem().setPattern("/other/**")
|
||||
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
|
||||
}});
|
||||
apiDefinitions.add(api1);
|
||||
apiDefinitions.add(api2);
|
||||
GatewayApiDefinitionManager.loadApiDefinitions(apiDefinitions);
|
||||
SentinelGatewayFilter filter = new SentinelGatewayFilter();
|
||||
|
||||
when(requestPath.value()).thenReturn("/product/123");
|
||||
Set<String> matchingApis = filter.pickMatchingApiDefinitions(exchange);
|
||||
assertThat(matchingApis.size()).isEqualTo(1);
|
||||
assertThat(matchingApis.contains(apiName1)).isTrue();
|
||||
|
||||
when(requestPath.value()).thenReturn("/products");
|
||||
assertThat(filter.pickMatchingApiDefinitions(exchange).size()).isZero();
|
||||
|
||||
when(requestPath.value()).thenReturn("/something");
|
||||
matchingApis = filter.pickMatchingApiDefinitions(exchange);
|
||||
assertThat(matchingApis.size()).isEqualTo(1);
|
||||
assertThat(matchingApis.contains(apiName2)).isTrue();
|
||||
|
||||
when(requestPath.value()).thenReturn("/other/foo/3");
|
||||
matchingApis = filter.pickMatchingApiDefinitions(exchange);
|
||||
assertThat(matchingApis.size()).isEqualTo(1);
|
||||
assertThat(matchingApis.contains(apiName2)).isTrue();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
|
||||
GatewayRuleManager.loadRules(new HashSet<>());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
|
||||
GatewayRuleManager.loadRules(new HashSet<>());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,209 @@
|
|||
/*
|
||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.adapter.gateway.sc;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.param.GatewayParamParser;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayParamFlowItem;
|
||||
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
|
||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.server.RequestPath;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
*/
|
||||
public class SpringCloudGatewayParamParserTest {
|
||||
|
||||
private final GatewayParamParser<ServerWebExchange> paramParser = new GatewayParamParser<>(
|
||||
new ServerWebExchangeItemParser()
|
||||
);
|
||||
|
||||
@Test
|
||||
public void testParseParametersNoParamItem() {
|
||||
// Mock a request.
|
||||
ServerWebExchange exchange = mock(ServerWebExchange.class);
|
||||
// Prepare gateway rules.
|
||||
Set<GatewayFlowRule> rules = new HashSet<>();
|
||||
String routeId1 = "my_test_route_A";
|
||||
rules.add(new GatewayFlowRule(routeId1)
|
||||
.setCount(5)
|
||||
.setIntervalSec(1)
|
||||
);
|
||||
GatewayRuleManager.loadRules(rules);
|
||||
|
||||
Object[] params = paramParser.parseParameterFor(routeId1, exchange,
|
||||
e -> e.getResourceMode() == 0);
|
||||
assertThat(params.length).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseParametersWithItems() {
|
||||
// Mock a request.
|
||||
ServerWebExchange exchange = mock(ServerWebExchange.class);
|
||||
ServerHttpRequest request = mock(ServerHttpRequest.class);
|
||||
when(exchange.getRequest()).thenReturn(request);
|
||||
RequestPath requestPath = mock(RequestPath.class);
|
||||
when(request.getPath()).thenReturn(requestPath);
|
||||
|
||||
// Prepare gateway rules.
|
||||
Set<GatewayFlowRule> rules = new HashSet<>();
|
||||
String routeId1 = "my_test_route_A";
|
||||
String api1 = "my_test_route_B";
|
||||
String headerName = "X-Sentinel-Flag";
|
||||
String paramName = "p";
|
||||
GatewayFlowRule routeRule1 = new GatewayFlowRule(routeId1)
|
||||
.setCount(2)
|
||||
.setIntervalSec(2)
|
||||
.setBurst(2)
|
||||
.setParamItem(new GatewayParamFlowItem()
|
||||
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_CLIENT_IP)
|
||||
);
|
||||
GatewayFlowRule routeRule2 = new GatewayFlowRule(routeId1)
|
||||
.setCount(10)
|
||||
.setIntervalSec(1)
|
||||
.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)
|
||||
.setMaxQueueingTimeoutMs(600)
|
||||
.setParamItem(new GatewayParamFlowItem()
|
||||
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HEADER)
|
||||
.setFieldName(headerName)
|
||||
);
|
||||
GatewayFlowRule routeRule3 = new GatewayFlowRule(routeId1)
|
||||
.setCount(20)
|
||||
.setIntervalSec(1)
|
||||
.setBurst(5)
|
||||
.setParamItem(new GatewayParamFlowItem()
|
||||
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)
|
||||
.setFieldName(paramName)
|
||||
);
|
||||
GatewayFlowRule routeRule4 = new GatewayFlowRule(routeId1)
|
||||
.setCount(120)
|
||||
.setIntervalSec(10)
|
||||
.setBurst(30)
|
||||
.setParamItem(new GatewayParamFlowItem()
|
||||
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HOST)
|
||||
);
|
||||
GatewayFlowRule apiRule1 = new GatewayFlowRule(api1)
|
||||
.setResourceMode(SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME)
|
||||
.setCount(5)
|
||||
.setIntervalSec(1)
|
||||
.setParamItem(new GatewayParamFlowItem()
|
||||
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)
|
||||
.setFieldName(paramName)
|
||||
);
|
||||
rules.add(routeRule1);
|
||||
rules.add(routeRule2);
|
||||
rules.add(routeRule3);
|
||||
rules.add(routeRule4);
|
||||
rules.add(apiRule1);
|
||||
GatewayRuleManager.loadRules(rules);
|
||||
|
||||
String expectedHost = "hello.test.sentinel";
|
||||
String expectedAddress = "66.77.88.99";
|
||||
String expectedHeaderValue1 = "Sentinel";
|
||||
String expectedUrlParamValue1 = "17";
|
||||
mockClientHostAddress(request, expectedAddress);
|
||||
Map<String, String> expectedHeaders = new HashMap<String, String>() {{
|
||||
put(headerName, expectedHeaderValue1); put("Host", expectedHost);
|
||||
}};
|
||||
mockHeaders(request, expectedHeaders);
|
||||
mockSingleUrlParam(request, paramName, expectedUrlParamValue1);
|
||||
Object[] params = paramParser.parseParameterFor(routeId1, exchange, e -> e.getResourceMode() == 0);
|
||||
assertThat(params.length).isEqualTo(4);
|
||||
assertThat(params[routeRule1.getParamItem().getIndex()]).isEqualTo(expectedAddress);
|
||||
assertThat(params[routeRule2.getParamItem().getIndex()]).isEqualTo(expectedHeaderValue1);
|
||||
assertThat(params[routeRule3.getParamItem().getIndex()]).isEqualTo(expectedUrlParamValue1);
|
||||
assertThat(params[routeRule4.getParamItem().getIndex()]).isEqualTo(expectedHost);
|
||||
|
||||
assertThat(paramParser.parseParameterFor(api1, exchange, e -> e.getResourceMode() == 0).length).isZero();
|
||||
|
||||
String expectedUrlParamValue2 = "fs";
|
||||
mockSingleUrlParam(request, paramName, expectedUrlParamValue2);
|
||||
params = paramParser.parseParameterFor(api1, exchange, e -> e.getResourceMode() == 1);
|
||||
assertThat(params.length).isEqualTo(1);
|
||||
assertThat(params[apiRule1.getParamItem().getIndex()]).isEqualTo(expectedUrlParamValue2);
|
||||
}
|
||||
|
||||
private void mockClientHostAddress(/*@Mock*/ ServerHttpRequest request, String address) {
|
||||
InetSocketAddress socketAddress = mock(InetSocketAddress.class);
|
||||
when(request.getRemoteAddress()).thenReturn(socketAddress);
|
||||
InetAddress inetAddress = mock(InetAddress.class);
|
||||
when(inetAddress.getHostAddress()).thenReturn(address);
|
||||
when(socketAddress.getAddress()).thenReturn(inetAddress);
|
||||
}
|
||||
|
||||
private void mockHeaders(/*@Mock*/ ServerHttpRequest request, Map<String, String> headerMap) {
|
||||
HttpHeaders headers = mock(HttpHeaders.class);
|
||||
when(request.getHeaders()).thenReturn(headers);
|
||||
for (Map.Entry<String, String> e : headerMap.entrySet()) {
|
||||
when(headers.getFirst(e.getKey())).thenReturn(e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void mockUrlParams(/*@Mock*/ ServerHttpRequest request, Map<String, String> paramMap) {
|
||||
MultiValueMap<String, String> urlParams = mock(MultiValueMap.class);
|
||||
when(request.getQueryParams()).thenReturn(urlParams);
|
||||
for (Map.Entry<String, String> e : paramMap.entrySet()) {
|
||||
when(urlParams.getFirst(e.getKey())).thenReturn(e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void mockSingleUrlParam(/*@Mock*/ ServerHttpRequest request, String key, String value) {
|
||||
MultiValueMap<String, String> urlParams = mock(MultiValueMap.class);
|
||||
when(request.getQueryParams()).thenReturn(urlParams);
|
||||
when(urlParams.getFirst(key)).thenReturn(value);
|
||||
}
|
||||
|
||||
private void mockSingleHeader(/*@Mock*/ ServerHttpRequest request, String key, String value) {
|
||||
HttpHeaders headers = mock(HttpHeaders.class);
|
||||
when(request.getHeaders()).thenReturn(headers);
|
||||
when(headers.getFirst(key)).thenReturn(value);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
|
||||
GatewayRuleManager.loadRules(new HashSet<>());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
GatewayApiDefinitionManager.loadApiDefinitions(new HashSet<>());
|
||||
GatewayRuleManager.loadRules(new HashSet<>());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
mock-maker-inline
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
<spring.boot.version>3.0.0</spring.boot.version>
|
||||
<servlet.api.version>6.0.0</servlet.api.version>
|
||||
<jakarta.xml.bind-api.version>4.0.0</jakarta.xml.bind-api.version>
|
||||
<slf4j-api.version>2.0.4</slf4j-api.version>
|
||||
<slf4j-api.version>2.0.16</slf4j-api.version>
|
||||
|
||||
<skip.spring.v6x.test>false</skip.spring.v6x.test>
|
||||
</properties>
|
||||
|
|
|
|||
|
|
@ -16,7 +16,13 @@
|
|||
package com.alibaba.csp.sentinel.cluster.client;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants;
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
|
||||
|
|
@ -35,6 +41,7 @@ import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
|
|||
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
|
||||
import com.alibaba.csp.sentinel.log.RecordLog;
|
||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
|
||||
|
||||
/**
|
||||
* Default implementation of {@link ClusterTokenClient}.
|
||||
|
|
@ -43,11 +50,30 @@ import com.alibaba.csp.sentinel.util.StringUtil;
|
|||
* @since 1.4.0
|
||||
*/
|
||||
public class DefaultClusterTokenClient implements ClusterTokenClient {
|
||||
public class CachedTokenData {
|
||||
AtomicInteger count;
|
||||
AtomicInteger lastStatus;
|
||||
AtomicLong lastWaitUntilMs;
|
||||
AtomicInteger lastWaitPrefetchCnt;
|
||||
AtomicInteger lastRemaining;
|
||||
public CachedTokenData() {
|
||||
count = new AtomicInteger(0);
|
||||
lastStatus = new AtomicInteger(TokenResultStatus.OK);
|
||||
lastWaitUntilMs = new AtomicLong(0);
|
||||
lastWaitPrefetchCnt = new AtomicInteger(0);
|
||||
lastRemaining = new AtomicInteger(0);
|
||||
}
|
||||
}
|
||||
|
||||
private ClusterTransportClient transportClient;
|
||||
private TokenServerDescriptor serverDescriptor;
|
||||
|
||||
private final AtomicBoolean shouldStart = new AtomicBoolean(false);
|
||||
private int checkInterval = 2;
|
||||
ConcurrentHashMap<Long, CachedTokenData> localPrefetchedTokens = new ConcurrentHashMap<>();
|
||||
@SuppressWarnings("PMD.ThreadPoolCreationRule")
|
||||
private final ScheduledExecutorService prefetchScheduler = Executors.newScheduledThreadPool(2,
|
||||
new NamedThreadFactory("sentinel-cluster-prefetch-scheduler", true));
|
||||
|
||||
public DefaultClusterTokenClient() {
|
||||
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
|
||||
|
|
@ -146,6 +172,141 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
|
|||
return serverDescriptor;
|
||||
}
|
||||
|
||||
public void setInterval(int val) {
|
||||
checkInterval = val;
|
||||
}
|
||||
|
||||
public void resetCache() {
|
||||
localPrefetchedTokens.clear();
|
||||
}
|
||||
|
||||
public int currentRuleCached(Long flowId) {
|
||||
CachedTokenData d = localPrefetchedTokens.get(flowId);
|
||||
if (d == null) {
|
||||
return 0;
|
||||
}
|
||||
return d.count.get();
|
||||
}
|
||||
|
||||
private void preFetch(Long flowId, CachedTokenData value, int prefetchCnt) {
|
||||
long waitUntil = value.lastWaitUntilMs.get();
|
||||
if (waitUntil > 0 && System.currentTimeMillis() < waitUntil) {
|
||||
return;
|
||||
}
|
||||
if (waitUntil > 0) {
|
||||
value.count.addAndGet(value.lastWaitPrefetchCnt.get());
|
||||
value.lastStatus.set(TokenResultStatus.OK);
|
||||
value.lastWaitUntilMs.set(0);
|
||||
value.lastWaitPrefetchCnt.set(0);
|
||||
}
|
||||
int current = value.count.get();
|
||||
if (current >= prefetchCnt / 2) {
|
||||
return;
|
||||
}
|
||||
if (current < -1 * prefetchCnt) {
|
||||
// avoid too much prefetch
|
||||
current = -1 * prefetchCnt;
|
||||
}
|
||||
prefetchCnt = prefetchCnt - current;
|
||||
TokenResult fetched = requestToken(flowId, prefetchCnt, true);
|
||||
value.lastWaitUntilMs.set(0);
|
||||
value.lastStatus.set(fetched.getStatus());
|
||||
value.lastRemaining.set(fetched.getRemaining());
|
||||
if (fetched.getStatus() == TokenResultStatus.OK) {
|
||||
value.count.addAndGet(prefetchCnt);
|
||||
} else if (fetched.getStatus() == TokenResultStatus.SHOULD_WAIT) {
|
||||
value.lastWaitUntilMs.set(System.currentTimeMillis() + fetched.getWaitInMs());
|
||||
value.lastWaitPrefetchCnt.set(prefetchCnt);
|
||||
}
|
||||
}
|
||||
|
||||
private TokenResult tryLocalCachedToken(CachedTokenData data, int acquireCount, int prefetchCnt) {
|
||||
int count = data.count.get();
|
||||
TokenResult ret = new TokenResult(data.lastStatus.get());
|
||||
ret.setFromCached(true);
|
||||
ret.setRemaining(data.lastRemaining.get());
|
||||
if (count >= acquireCount) {
|
||||
// here we allow the concurrency which may cause decrease to negative count, it
|
||||
// is just skipped some requests
|
||||
// and it will be refilled by the bg prefetch in next round.
|
||||
data.count.addAndGet(-1 * acquireCount);
|
||||
ret.setStatus(TokenResultStatus.OK);
|
||||
return ret;
|
||||
}
|
||||
if (acquireCount > prefetchCnt) {
|
||||
return null;
|
||||
}
|
||||
if (ret.getStatus() == TokenResultStatus.SHOULD_WAIT) {
|
||||
int newN = data.count.addAndGet(-1 * acquireCount);
|
||||
if (newN + data.lastWaitPrefetchCnt.get() < -1 * prefetchCnt) {
|
||||
data.count.addAndGet(acquireCount);
|
||||
if (acquireCount <= prefetchCnt / 2) {
|
||||
// since last status is still waiting, we should not block directly, make it failover to local
|
||||
ret.setStatus(TokenResultStatus.FAIL);
|
||||
return ret;
|
||||
}
|
||||
// for the large acquireCount, we can try remote again, since large request will
|
||||
// much slower which will have less pressure to remote
|
||||
return null;
|
||||
}
|
||||
int waitMs = (int) (data.lastWaitUntilMs.get() - System.currentTimeMillis());
|
||||
if (waitMs > 0) {
|
||||
ret.setWaitInMs(waitMs);
|
||||
}
|
||||
return ret;
|
||||
} else if (ret.getStatus() == TokenResultStatus.OK) {
|
||||
// last ok, but the cached count is not enough, we can preuse it to avoid remote
|
||||
// request too often,
|
||||
// otherwise just try remote request
|
||||
int newN = data.count.addAndGet(-1 * acquireCount);
|
||||
if (newN < -1 * prefetchCnt * 2) {
|
||||
// preuse failed since not enough, added it back
|
||||
data.count.addAndGet(acquireCount);
|
||||
if (acquireCount <= prefetchCnt / 2) {
|
||||
// since last is still ok, we should not block directly, make it failover to local
|
||||
ret.setStatus(TokenResultStatus.FAIL);
|
||||
return ret;
|
||||
}
|
||||
// for the large acquireCount, we can try remote again, since large request will much slower which will have less pressure to remote
|
||||
return null;
|
||||
}
|
||||
// preuse ok
|
||||
return ret;
|
||||
} else {
|
||||
// should fail directly
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TokenResult requestTokenWithCache(Long flowId, int acquireCount, int prefetchCnt) {
|
||||
if (notValidRequest(flowId, acquireCount)) {
|
||||
return badRequest();
|
||||
}
|
||||
// try local prefetched first
|
||||
CachedTokenData data = localPrefetchedTokens.get(flowId);
|
||||
if (data != null) {
|
||||
TokenResult ret = tryLocalCachedToken(data, acquireCount, prefetchCnt);
|
||||
if (ret != null) {
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
localPrefetchedTokens.computeIfAbsent(flowId, k -> {
|
||||
CachedTokenData v = new CachedTokenData();
|
||||
prefetchScheduler.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
preFetch(flowId, v, prefetchCnt);
|
||||
} catch (Throwable e) {
|
||||
RecordLog.info("[DefaultClusterTokenClient] prefetch failed for flowId {}", flowId, e);
|
||||
}
|
||||
}, 0, checkInterval, TimeUnit.MILLISECONDS);
|
||||
return v;
|
||||
});
|
||||
}
|
||||
// fallback to remote request
|
||||
return requestToken(flowId, acquireCount, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
|
||||
if (notValidRequest(flowId, acquireCount)) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,432 @@
|
|||
package com.alibaba.csp.sentinel.cluster.client;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Spy;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.Assert;
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
|
||||
import com.alibaba.csp.sentinel.cluster.TokenResult;
|
||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class DefaultClusterTokenClientTest {
|
||||
|
||||
@Spy
|
||||
DefaultClusterTokenClient client = new DefaultClusterTokenClient();
|
||||
final int testInterval = 60;
|
||||
|
||||
@Test
|
||||
public void testClientCacheWithRemoteOK() throws Exception {
|
||||
client.setInterval(testInterval);
|
||||
client.resetCache();
|
||||
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
int prefetch = 10;
|
||||
TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
// first should remote
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
|
||||
for (int i = 0;i < prefetch * 3; i++) {
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
}
|
||||
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
|
||||
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
|
||||
ret = client.requestTokenWithCache(1L, cnt, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
|
||||
}
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
Thread.sleep(testInterval * 2);
|
||||
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
|
||||
// should refill prefetch * 2 in once to make sure we have at least prefetch count in cache
|
||||
ret = client.requestTokenWithCache(1L, prefetch, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
Assert.assertEquals(0, client.currentRuleCached(1L));
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
Assert.assertEquals(prefetch / 2, client.currentRuleCached(1L));
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(prefetch / 2, client.currentRuleCached(1L));
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
// use less than half will not refill, so cache is not enough
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
Assert.assertEquals(prefetch / 2 - 1, client.currentRuleCached(1L));
|
||||
// refill at least prefetch at once, so we can get at most 1.5 * prefetch in cache
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
|
||||
ret = client.requestTokenWithCache(1L, prefetch, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
Assert.assertEquals(0, client.currentRuleCached(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientCacheWithRemoteWait() throws Exception {
|
||||
client.setInterval(testInterval);
|
||||
client.resetCache();
|
||||
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
int prefetch = 10;
|
||||
TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
// first should remote
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
|
||||
TokenResult waitResult = new TokenResult(TokenResultStatus.SHOULD_WAIT);
|
||||
waitResult.setWaitInMs(testInterval * 4);
|
||||
doReturn(waitResult).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
|
||||
|
||||
for (int i = 0;i < prefetch * 3; i++) {
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
}
|
||||
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
|
||||
|
||||
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
|
||||
ret = client.requestTokenWithCache(1L, cnt, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
|
||||
}
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
|
||||
|
||||
Thread.sleep(testInterval * 2);
|
||||
// prefetch count will be 2 * prefetch, and last status became should wait
|
||||
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
|
||||
// refill will be waited until the timeout
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 3 <= ret.getWaitInMs());
|
||||
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval > ret.getWaitInMs());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 3 <= ret.getWaitInMs());
|
||||
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval > ret.getWaitInMs());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(-1 * prefetch * 2 - 1 - prefetch / 2 - 1, client.currentRuleCached(1L));
|
||||
// wait the timeout
|
||||
Thread.sleep(waitResult.getWaitInMs());
|
||||
// the prefetch count should be added to the count
|
||||
Assert.assertEquals(- 1 - prefetch / 2 - 1, client.currentRuleCached(1L));
|
||||
|
||||
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
Thread.sleep(waitResult.getWaitInMs());
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
|
||||
doReturn(waitResult).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch - 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
Assert.assertEquals(1, client.currentRuleCached(1L));
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
// refill will be waiting and the last state became waiting, but local has some tokens
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
Assert.assertEquals(0, client.currentRuleCached(1L));
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 2 <= ret.getWaitInMs());
|
||||
Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() > ret.getWaitInMs());
|
||||
ret = client.requestTokenWithCache(1L, prefetch - 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
|
||||
|
||||
Assert.assertEquals(-1 * prefetch * 2 + 1, client.currentRuleCached(1L));
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(-1 * prefetch * 2 + 1, client.currentRuleCached(1L));
|
||||
// refill will be waiting
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus());
|
||||
Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs());
|
||||
|
||||
Thread.sleep(waitResult.getWaitInMs() + testInterval);
|
||||
// the prefetch count should be added to the count
|
||||
Assert.assertEquals(-1 * prefetch, client.currentRuleCached(1L));
|
||||
Thread.sleep(waitResult.getWaitInMs() + testInterval);
|
||||
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientCacheWithRemoteBlocked() throws Exception {
|
||||
client.setInterval(testInterval);
|
||||
client.resetCache();
|
||||
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
int prefetch = 10;
|
||||
TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
// first should remote
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
// begin test while remote refused
|
||||
doReturn(new TokenResult(TokenResultStatus.BLOCKED)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
for (int i = 0;i < prefetch * 3; i++) {
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
}
|
||||
Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L));
|
||||
|
||||
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
|
||||
ret = client.requestTokenWithCache(1L, cnt, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus());
|
||||
}
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
// refill will be blocked
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
|
||||
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
Thread.sleep(testInterval * 2);
|
||||
Assert.assertEquals(prefetch, client.currentRuleCached(1L));
|
||||
|
||||
doReturn(new TokenResult(TokenResultStatus.BLOCKED)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
|
||||
for (int i = 0;i < prefetch / 2 + 1; i++) {
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
}
|
||||
Thread.sleep(testInterval);
|
||||
Assert.assertEquals(prefetch - prefetch / 2 - 1, client.currentRuleCached(1L));
|
||||
// refill will be blocked and the last state became blocked, but local has some tokens
|
||||
ret = client.requestTokenWithCache(1L, prefetch - prefetch / 2 - 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
Assert.assertEquals(0, client.currentRuleCached(1L));
|
||||
|
||||
for (int cnt = 1; cnt <= prefetch / 2; cnt++) {
|
||||
ret = client.requestTokenWithCache(1L, cnt, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
}
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
|
||||
Thread.sleep(testInterval);
|
||||
// refill will be blocked
|
||||
ret = client.requestTokenWithCache(1L, 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch);
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch);
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus());
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrencyRequestClientCache() throws Exception {
|
||||
client.setInterval(1);
|
||||
client.resetCache();
|
||||
doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean());
|
||||
int prefetch = 200;
|
||||
|
||||
ScheduledExecutorService testScheduler = Executors.newScheduledThreadPool(16,
|
||||
new NamedThreadFactory("test-scheduler", true));
|
||||
|
||||
AtomicInteger blocked = new AtomicInteger();
|
||||
AtomicInteger failed = new AtomicInteger();
|
||||
AtomicInteger ok = new AtomicInteger();
|
||||
AtomicInteger cached = new AtomicInteger();
|
||||
AtomicInteger notCached = new AtomicInteger();
|
||||
AtomicBoolean stopped = new AtomicBoolean(false);
|
||||
for (int concurrency = 0; concurrency < 8; concurrency++) {
|
||||
testScheduler.submit(() -> {
|
||||
System.out.println("running begin");
|
||||
for (int loop = 0; loop < 200; loop++) {
|
||||
for (int cnt = 1; cnt < prefetch * 2; cnt++) {
|
||||
TokenResult ret = client.requestTokenWithCache(1L, cnt, prefetch);
|
||||
if (cnt > prefetch * 1.5) {
|
||||
Assert.assertTrue(!ret.isFromCached());
|
||||
Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus());
|
||||
notCached.incrementAndGet();
|
||||
} else {
|
||||
if (ret.getStatus() == TokenResultStatus.BLOCKED) {
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
blocked.incrementAndGet();
|
||||
cached.incrementAndGet();
|
||||
} else if (ret.getStatus() == TokenResultStatus.FAIL) {
|
||||
Assert.assertTrue(ret.isFromCached());
|
||||
failed.incrementAndGet();
|
||||
cached.incrementAndGet();
|
||||
} else {
|
||||
ok.incrementAndGet();
|
||||
if (ret.isFromCached()) {
|
||||
cached.incrementAndGet();
|
||||
} else {
|
||||
notCached.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(0, ret.getWaitInMs());
|
||||
if (cnt % 50 == 0) {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (stopped.get()) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
System.out.println("running done");
|
||||
});
|
||||
}
|
||||
|
||||
testScheduler.submit(() -> {
|
||||
for (; !stopped.get() ;) {
|
||||
System.out.println("current rule cached: " + client.currentRuleCached(1L));
|
||||
System.out.println("current failed: " + failed.get() + ", passed: " + ok.get() + ", cached: " + cached.get() + ", not cached: " + notCached.get());
|
||||
try {
|
||||
Thread.sleep(3);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
Thread.sleep(2000);
|
||||
|
||||
stopped.set(true);
|
||||
testScheduler.shutdown();
|
||||
testScheduler.awaitTermination(1, TimeUnit.SECONDS);
|
||||
|
||||
System.out.println("current rule cached: " + client.currentRuleCached(1L));
|
||||
System.out.println("current failed: " + failed.get() + ", passed: " + ok.get() + ", cached: " + cached.get()
|
||||
+ ", not cached: " + notCached.get());
|
||||
Assert.assertTrue(blocked.get() + failed.get() < cached.get());
|
||||
Assert.assertTrue(cached.get() + notCached.get() > blocked.get() + ok.get() + failed.get());
|
||||
Assert.assertTrue(failed.get() > 0);
|
||||
Assert.assertTrue(ok.get() > 0);
|
||||
Assert.assertTrue(client.currentRuleCached(1L) >= prefetch / 2);
|
||||
}
|
||||
}
|
||||
|
|
@ -37,13 +37,14 @@ final class ClusterFlowChecker {
|
|||
|
||||
private static double calcGlobalThreshold(FlowRule rule) {
|
||||
double count = rule.getCount();
|
||||
double preCnt = rule.getClusterConfig().getPrefetchCntRatio() * count;
|
||||
switch (rule.getClusterConfig().getThresholdType()) {
|
||||
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
|
||||
return count;
|
||||
return count + preCnt;
|
||||
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
|
||||
default:
|
||||
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
|
||||
return count * connectedCount;
|
||||
return count * connectedCount + preCnt;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,11 @@ import java.util.Collection;
|
|||
@Spi(isDefault = true)
|
||||
public class DefaultTokenService implements TokenService {
|
||||
|
||||
@Override
|
||||
public TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt) {
|
||||
return requestToken(ruleId, acquireCount, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
|
||||
if (notValidRequest(ruleId, acquireCount)) {
|
||||
|
|
|
|||
|
|
@ -43,6 +43,11 @@ public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer {
|
|||
server.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt) {
|
||||
return requestToken(ruleId, acquireCount, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
|
||||
if (tokenService != null) {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ public class TokenResult {
|
|||
|
||||
private Map<String, String> attachments;
|
||||
|
||||
private boolean isFromCached;
|
||||
|
||||
public TokenResult() {
|
||||
}
|
||||
|
||||
|
|
@ -85,6 +87,15 @@ public class TokenResult {
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean isFromCached() {
|
||||
return isFromCached;
|
||||
}
|
||||
|
||||
public TokenResult setFromCached(boolean fromCached) {
|
||||
isFromCached = fromCached;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TokenResult{" +
|
||||
|
|
@ -93,6 +104,7 @@ public class TokenResult {
|
|||
", waitInMs=" + waitInMs +
|
||||
", attachments=" + attachments +
|
||||
", tokenId=" + tokenId +
|
||||
", isFromCached=" + isFromCached +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,6 +35,15 @@ public interface TokenService {
|
|||
*/
|
||||
TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized);
|
||||
|
||||
/**
|
||||
* Request tokens from cache and remote token server.
|
||||
*
|
||||
* @param ruleId the unique rule ID
|
||||
* @param acquireCount token count to acquire
|
||||
* @return result of the token request
|
||||
*/
|
||||
TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt);
|
||||
|
||||
/**
|
||||
* Request tokens for a specific parameter from remote token server.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -118,6 +118,12 @@ public class DefaultNode extends StatisticNode {
|
|||
this.clusterNode.increaseExceptionQps(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increaseFallbackQps(int count) {
|
||||
super.increaseFallbackQps(count);
|
||||
this.clusterNode.increaseFallbackQps(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addRtAndSuccess(long rt, int successCount) {
|
||||
super.addRtAndSuccess(rt, successCount);
|
||||
|
|
|
|||
|
|
@ -67,6 +67,8 @@ public interface Node extends OccupySupport, DebugSupport {
|
|||
* @return total business exception count per minute
|
||||
*/
|
||||
long totalException();
|
||||
|
||||
long totalFallback();
|
||||
|
||||
/**
|
||||
* Get pass request per second.
|
||||
|
|
@ -109,6 +111,8 @@ public interface Node extends OccupySupport, DebugSupport {
|
|||
* @return QPS of exception occurs
|
||||
*/
|
||||
double exceptionQps();
|
||||
|
||||
double fallbackQps();
|
||||
|
||||
/**
|
||||
* Get average rt per second.
|
||||
|
|
@ -186,6 +190,8 @@ public interface Node extends OccupySupport, DebugSupport {
|
|||
*/
|
||||
void increaseExceptionQps(int count);
|
||||
|
||||
void increaseFallbackQps(int count);
|
||||
|
||||
/**
|
||||
* Increase current thread count.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -149,6 +149,7 @@ public class StatisticNode implements Node {
|
|||
@Override
|
||||
public void reset() {
|
||||
rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
|
||||
rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -196,6 +197,16 @@ public class StatisticNode implements Node {
|
|||
return rollingCounterInMinute.exception();
|
||||
}
|
||||
|
||||
@Override
|
||||
public double fallbackQps() {
|
||||
return rollingCounterInSecond.fallback() / rollingCounterInSecond.getWindowIntervalInSec();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long totalFallback() {
|
||||
return rollingCounterInMinute.fallback();
|
||||
}
|
||||
|
||||
@Override
|
||||
public double passQps() {
|
||||
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
|
||||
|
|
@ -269,6 +280,12 @@ public class StatisticNode implements Node {
|
|||
rollingCounterInMinute.addException(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increaseFallbackQps(int count) {
|
||||
rollingCounterInSecond.addFallback(count);
|
||||
rollingCounterInMinute.addFallback(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increaseThreadNum() {
|
||||
curThreadNum.increment();
|
||||
|
|
|
|||
|
|
@ -15,9 +15,10 @@
|
|||
*/
|
||||
package com.alibaba.csp.sentinel.node.metric;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
/**
|
||||
* Metrics data for a specific resource at given {@code timestamp}.
|
||||
|
|
@ -210,12 +211,19 @@ public class MetricNode {
|
|||
*
|
||||
* @return string format of this.
|
||||
*/
|
||||
|
||||
private static final DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
public String toFatString() {
|
||||
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
StringBuilder sb = new StringBuilder(32);
|
||||
sb.delete(0, sb.length());
|
||||
sb.append(getTimestamp()).append("|");
|
||||
sb.append(df.format(new Date(getTimestamp()))).append("|");
|
||||
|
||||
long timestamp = getTimestamp();
|
||||
sb.append(timestamp).append("|");
|
||||
|
||||
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
|
||||
sb.append(df.format(dateTime)).append("|");
|
||||
|
||||
String legalName = getResource().replaceAll("\\|", "_");
|
||||
sb.append(legalName).append("|");
|
||||
sb.append(getPassQps()).append("|");
|
||||
|
|
|
|||
|
|
@ -73,6 +73,27 @@ public class ClusterFlowConfig {
|
|||
*/
|
||||
private long clientOfflineTime = 2000;
|
||||
|
||||
// prefetch ratio if used to calculate prefetch count (ratio * count) for each request, prefetch can be used
|
||||
// to reduce the number of requests to the server and reduce the latency for the request.
|
||||
// 0: disable prefetch.
|
||||
// value should be in the range [0.001 ~ 0.1]
|
||||
// We suggest disable prefetch if the limit count is less than 100,
|
||||
// for count <100, suggest ratio 0.1
|
||||
// for count 100~1000, suggest ratio 0.06
|
||||
// for count 1000~10000, suggest ratio 0.04
|
||||
// for count 10000~100000, suggest ratio 0.01
|
||||
// for count 100000~1M, suggest ratio 0.01~0.002
|
||||
// note the prefetch count should less than 50% of the avg count of each client (cluster total count / nums of clients).
|
||||
private double prefetchCntRatio = 0;
|
||||
|
||||
public double getPrefetchCntRatio() {
|
||||
return prefetchCntRatio;
|
||||
}
|
||||
|
||||
public void setPrefetchCntRatio(double prefetchCntRatio) {
|
||||
this.prefetchCntRatio = prefetchCntRatio;
|
||||
}
|
||||
|
||||
public long getResourceTimeout() {
|
||||
return resourceTimeout;
|
||||
}
|
||||
|
|
@ -197,6 +218,9 @@ public class ClusterFlowConfig {
|
|||
if (acquireRefuseStrategy != that.acquireRefuseStrategy) {
|
||||
return false;
|
||||
}
|
||||
if (Double.compare(prefetchCntRatio, that.prefetchCntRatio) != 0) {
|
||||
return false;
|
||||
}
|
||||
return Objects.equals(flowId, that.flowId);
|
||||
}
|
||||
|
||||
|
|
@ -212,6 +236,7 @@ public class ClusterFlowConfig {
|
|||
result = (int) (31 * result + clientOfflineTime);
|
||||
result = 31 * result + resourceTimeoutStrategy;
|
||||
result = 31 * result + acquireRefuseStrategy;
|
||||
result = (int) (31 * result + prefetchCntRatio);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -228,6 +253,7 @@ public class ClusterFlowConfig {
|
|||
", resourceTimeoutStrategy=" + resourceTimeoutStrategy +
|
||||
", acquireRefuseStrategy=" + acquireRefuseStrategy +
|
||||
", clientOfflineTime=" + clientOfflineTime +
|
||||
", prefetchCntRatio=" + prefetchCntRatio +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -152,8 +152,15 @@ public class FlowRuleChecker {
|
|||
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
|
||||
}
|
||||
long flowId = rule.getClusterConfig().getFlowId();
|
||||
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
|
||||
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
|
||||
double ratio = rule.getClusterConfig().getPrefetchCntRatio();
|
||||
int prefetchCnt = (int)(ratio * rule.getCount());
|
||||
if (prefetchCnt > 1) {
|
||||
TokenResult result = clusterService.requestTokenWithCache(flowId, acquireCount, prefetchCnt);
|
||||
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
|
||||
} else {
|
||||
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
|
||||
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
|
||||
}
|
||||
// If client is absent, then fallback to local mode.
|
||||
} catch (Throwable ex) {
|
||||
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
|
||||
|
|
@ -166,6 +173,7 @@ public class FlowRuleChecker {
|
|||
private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
|
||||
boolean prioritized) {
|
||||
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
|
||||
node.increaseFallbackQps(acquireCount);
|
||||
return passLocalCheck(rule, context, node, acquireCount, prioritized);
|
||||
} else {
|
||||
// The rule won't be activated, just pass.
|
||||
|
|
@ -192,7 +200,11 @@ public class FlowRuleChecker {
|
|||
case TokenResultStatus.SHOULD_WAIT:
|
||||
// Wait for next tick.
|
||||
try {
|
||||
Thread.sleep(result.getWaitInMs());
|
||||
int waitMs = result.getWaitInMs();
|
||||
if (waitMs > rule.getMaxQueueingTimeMs()) {
|
||||
waitMs = rule.getMaxQueueingTimeMs();
|
||||
}
|
||||
Thread.sleep(waitMs);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
@ -207,4 +219,4 @@ public class FlowRuleChecker {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,5 +35,7 @@ public enum MetricEvent {
|
|||
/**
|
||||
* Passed in future quota (pre-occupied, since 1.5.0).
|
||||
*/
|
||||
OCCUPIED_PASS
|
||||
OCCUPIED_PASS,
|
||||
// fallback pass in cluster mode
|
||||
FALLBACK
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,6 +91,10 @@ public class MetricBucket {
|
|||
return get(MetricEvent.EXCEPTION);
|
||||
}
|
||||
|
||||
public long fallback() {
|
||||
return get(MetricEvent.FALLBACK);
|
||||
}
|
||||
|
||||
public long rt() {
|
||||
return get(MetricEvent.RT);
|
||||
}
|
||||
|
|
@ -115,6 +119,10 @@ public class MetricBucket {
|
|||
add(MetricEvent.EXCEPTION, n);
|
||||
}
|
||||
|
||||
public void addFallback(int n) {
|
||||
add(MetricEvent.FALLBACK, n);
|
||||
}
|
||||
|
||||
public void addBlock(int n) {
|
||||
add(MetricEvent.BLOCK, n);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,6 +93,17 @@ public class ArrayMetric implements Metric {
|
|||
return exception;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long fallback() {
|
||||
data.currentWindow();
|
||||
long fallback = 0;
|
||||
List<MetricBucket> list = data.values();
|
||||
for (MetricBucket window : list) {
|
||||
fallback += window.fallback();
|
||||
}
|
||||
return fallback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long block() {
|
||||
data.currentWindow();
|
||||
|
|
@ -216,6 +227,12 @@ public class ArrayMetric implements Metric {
|
|||
wrap.value().addException(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFallback(int count) {
|
||||
WindowWrap<MetricBucket> wrap = data.currentWindow();
|
||||
wrap.value().addFallback(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBlock(int count) {
|
||||
WindowWrap<MetricBucket> wrap = data.currentWindow();
|
||||
|
|
|
|||
|
|
@ -50,6 +50,13 @@ public interface Metric extends DebugSupport {
|
|||
*/
|
||||
long exception();
|
||||
|
||||
/**
|
||||
* Get total fallback count.
|
||||
*
|
||||
* @return fallback count
|
||||
*/
|
||||
long fallback();
|
||||
|
||||
/**
|
||||
* Get total block count.
|
||||
*
|
||||
|
|
@ -108,6 +115,13 @@ public interface Metric extends DebugSupport {
|
|||
*/
|
||||
void addException(int n);
|
||||
|
||||
/**
|
||||
* Add current fallback count.
|
||||
*
|
||||
* @param n count to add
|
||||
*/
|
||||
void addFallback(int n);
|
||||
|
||||
/**
|
||||
* Add current block count.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -156,7 +156,7 @@ public class EagleEyeCoreUtilsTest {
|
|||
@Test
|
||||
public void testFormatTime() {
|
||||
Assert.assertEquals("2019-06-15 12:13:14.000",
|
||||
EagleEyeCoreUtils.formatTime(1560600794000L - TimeZone.getDefault().getRawOffset()));
|
||||
EagleEyeCoreUtils.formatTime(1560600794000L - TimeZone.getDefault().getOffset(1560600794000L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -66,4 +66,4 @@ public class SentinelAspectConfiguration {
|
|||
}
|
||||
```
|
||||
|
||||
An example for using Sentinel Annotation AspectJ with Spring Boot can be found [here](https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-annotation-spring-aop).
|
||||
An example for using Sentinel Annotation AspectJ with Spring Boot can be found in [sentinel-demo/sentinel-demo-annotation-spring-aop](https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-annotation-spring-aop).
|
||||
|
|
|
|||
|
|
@ -37,11 +37,6 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
|
|||
@Override
|
||||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
|
||||
boolean prioritized, Object... args) throws Throwable {
|
||||
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
|
||||
fireEntry(context, resourceWrapper, node, count, prioritized, args);
|
||||
return;
|
||||
}
|
||||
|
||||
checkFlow(resourceWrapper, count, args);
|
||||
fireEntry(context, resourceWrapper, node, count, prioritized, args);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,9 +39,11 @@ public class NodeVo {
|
|||
private Long averageRt;
|
||||
private Long successQps;
|
||||
private Long exceptionQps;
|
||||
private Long fallbackQps;
|
||||
private Long oneMinutePass;
|
||||
private Long oneMinuteBlock;
|
||||
private Long oneMinuteException;
|
||||
private Long oneMinuteFallback;
|
||||
private Long oneMinuteTotal;
|
||||
|
||||
private Long timestamp;
|
||||
|
|
@ -69,7 +71,9 @@ public class NodeVo {
|
|||
vo.averageRt = (long) node.avgRt();
|
||||
vo.successQps = (long) node.successQps();
|
||||
vo.exceptionQps = (long) node.exceptionQps();
|
||||
vo.fallbackQps = (long) node.fallbackQps();
|
||||
vo.oneMinuteException = node.totalException();
|
||||
vo.oneMinuteFallback = node.totalFallback();
|
||||
vo.oneMinutePass = node.totalRequest() - node.blockRequest();
|
||||
vo.oneMinuteBlock = node.blockRequest();
|
||||
vo.oneMinuteTotal = node.totalRequest();
|
||||
|
|
@ -108,7 +112,9 @@ public class NodeVo {
|
|||
vo.averageRt = (long) node.avgRt();
|
||||
vo.successQps = (long) node.successQps();
|
||||
vo.exceptionQps = (long) node.exceptionQps();
|
||||
vo.fallbackQps = (long) node.fallbackQps();
|
||||
vo.oneMinuteException = node.totalException();
|
||||
vo.oneMinuteFallback = node.totalFallback();
|
||||
vo.oneMinutePass = node.totalRequest() - node.blockRequest();
|
||||
vo.oneMinuteBlock = node.blockRequest();
|
||||
vo.oneMinuteTotal = node.totalRequest();
|
||||
|
|
@ -204,6 +210,22 @@ public class NodeVo {
|
|||
this.oneMinuteException = oneMinuteException;
|
||||
}
|
||||
|
||||
public Long getFallbackQps() {
|
||||
return fallbackQps;
|
||||
}
|
||||
|
||||
public void setFallbackQps(Long fallbackQps) {
|
||||
this.fallbackQps = fallbackQps;
|
||||
}
|
||||
|
||||
public Long getOneMinuteFallback() {
|
||||
return oneMinuteFallback;
|
||||
}
|
||||
|
||||
public void setOneMinuteFallback(Long oneMinuteFallback) {
|
||||
this.oneMinuteFallback = oneMinuteFallback;
|
||||
}
|
||||
|
||||
public Long getOneMinutePass() {
|
||||
return oneMinutePass;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue