Compare commits
3 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
7f69c8f487 | |
|
|
04c1071fb4 | |
|
|
b12493e1a1 |
|
|
@ -130,13 +130,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
|||
.onErrorResume(err -> {
|
||||
log.error(err.getMessage(), err);
|
||||
return Mono.empty();
|
||||
})
|
||||
.as(MonoTracer
|
||||
.create(SpanName.connection(connection.getClientId()),
|
||||
builder -> {
|
||||
builder.setAttribute(clientId, connection.getClientId());
|
||||
builder.setAttribute(SpanKey.address, connection.getClientAddress().toString());
|
||||
})),
|
||||
}),
|
||||
Integer.MAX_VALUE)
|
||||
.subscribe();
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
package org.jetlinks.community.script;
|
||||
|
||||
import org.jetlinks.community.script.context.ExecutionContext;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
|
|
@ -87,6 +89,12 @@ public interface ScriptFactory {
|
|||
* @return 接口代理实现
|
||||
*/
|
||||
<T> T bind(Script script,
|
||||
Class<T> interfaceType);
|
||||
Class<T> interfaceType,
|
||||
ExecutionContext context);
|
||||
|
||||
default <T> T bind(Script script,
|
||||
Class<T> interfaceType) {
|
||||
return bind(script, interfaceType, ExecutionContext.create());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -96,12 +96,12 @@ public abstract class Jsr223ScriptFactory extends AbstractScriptFactory {
|
|||
|
||||
@Override
|
||||
@SuppressWarnings("all")
|
||||
public final <T> T bind(Script script, Class<T> interfaceType) {
|
||||
public final <T> T bind(Script script, Class<T> interfaceType,ExecutionContext context) {
|
||||
String returns = createFunctionMapping(interfaceType.getDeclaredMethods());
|
||||
String content = script.getContent() + "\n return " + returns + ";";
|
||||
|
||||
CompiledScript compiledScript = compile(script.content(content), false);
|
||||
Object source = compiledScript.call(Collections.emptyMap());
|
||||
Object source = compiledScript.call(context);
|
||||
Set<Method> ignoreMethods = new HashSet<>();
|
||||
|
||||
return (T) Proxy.newProxyInstance(
|
||||
|
|
|
|||
|
|
@ -1,15 +1,19 @@
|
|||
package org.jetlinks.community.device.message.transparent.script;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.hswebframework.web.exception.ValidationException;
|
||||
import org.jetlinks.community.device.message.transparent.SimpleTransparentMessageCodec;
|
||||
import org.jetlinks.community.device.message.transparent.TransparentMessageCodec;
|
||||
import org.jetlinks.community.device.message.transparent.TransparentMessageCodecProvider;
|
||||
import org.jetlinks.community.script.Script;
|
||||
import org.jetlinks.community.script.ScriptFactory;
|
||||
import org.jetlinks.community.script.Scripts;
|
||||
import org.jetlinks.community.script.context.ExecutionContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.Assert;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
|
@ -28,12 +32,15 @@ public class Jsr223TransparentMessageCodecProvider implements TransparentMessage
|
|||
Assert.hasText(lang, "lang can not be null");
|
||||
Assert.hasText(script, "script can not be null");
|
||||
|
||||
CodecContext context = new CodecContext();
|
||||
ScriptFactory factory = Scripts.getFactory(lang);
|
||||
|
||||
CodecContext context = new CodecContext(factory);
|
||||
|
||||
SimpleTransparentMessageCodec.Codec codec = factory.bind(
|
||||
Script.of("jsr223-transparent", script),
|
||||
SimpleTransparentMessageCodec.Codec.class,
|
||||
ExecutionContext.create(Collections.singletonMap("codec", context)));
|
||||
|
||||
SimpleTransparentMessageCodec.Codec codec = Scripts
|
||||
.getFactory(lang)
|
||||
.bind(Script.of("jsr223-transparent", script),
|
||||
SimpleTransparentMessageCodec.Codec.class);
|
||||
|
||||
if (context.encoder == null && codec != null) {
|
||||
context.onDownstream(codec::encode);
|
||||
|
|
@ -52,8 +59,9 @@ public class Jsr223TransparentMessageCodecProvider implements TransparentMessage
|
|||
));
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
public static class CodecContext implements SimpleTransparentMessageCodec.Codec {
|
||||
|
||||
private final ScriptFactory factory;
|
||||
private Function<SimpleTransparentMessageCodec.EncodeContext, Object> encoder;
|
||||
private Function<SimpleTransparentMessageCodec.DecodeContext, Object> decoder;
|
||||
|
||||
|
|
@ -70,7 +78,7 @@ public class Jsr223TransparentMessageCodecProvider implements TransparentMessage
|
|||
if (decoder == null) {
|
||||
return null;
|
||||
}
|
||||
return decoder.apply(context);
|
||||
return factory.convertToJavaType(decoder.apply(context));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -78,7 +86,7 @@ public class Jsr223TransparentMessageCodecProvider implements TransparentMessage
|
|||
if (encoder == null) {
|
||||
return null;
|
||||
}
|
||||
return encoder.apply(context);
|
||||
return factory.convertToJavaType(encoder.apply(context));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,14 +7,37 @@ import org.jetlinks.community.rule.engine.scene.term.TermColumn;
|
|||
import org.jetlinks.community.rule.engine.scene.value.TermValue;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class SceneUtils {
|
||||
|
||||
public static String createColumnAlias(String prefix,String column, boolean wrapColumn) {
|
||||
if (!column.contains(".")) {
|
||||
return wrapColumn ? wrapColumnName(column) : column;
|
||||
}
|
||||
String[] arr = column.split("[.]");
|
||||
String alias;
|
||||
//prefix.temp.current
|
||||
if (prefix.equals(arr[0])) {
|
||||
String property = arr[1];
|
||||
alias = property + "_" + arr[arr.length - 1];
|
||||
} else {
|
||||
if (arr.length > 1) {
|
||||
alias = String.join("_", Arrays.copyOfRange(arr, 1, arr.length));
|
||||
} else {
|
||||
alias = column.replace(".", "_");
|
||||
}
|
||||
}
|
||||
return wrapColumn ? wrapColumnName(alias) : alias;
|
||||
}
|
||||
|
||||
public static String wrapColumnName(String column) {
|
||||
if (column.startsWith("\"") && column.endsWith("\"")) {
|
||||
return column;
|
||||
}
|
||||
return "\"" + (column.replace("\"", "\\\"")) + "\"";
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据条件和可选的条件列解析出将要输出的变量信息
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import org.hswebframework.ezorm.rdb.executor.PrepareSqlRequest;
|
|||
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
|
||||
import org.hswebframework.ezorm.rdb.operator.builder.fragments.AbstractTermsFragmentBuilder;
|
||||
import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments;
|
||||
import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql;
|
||||
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
|
||||
import org.hswebframework.web.bean.FastBeanCopier;
|
||||
import org.hswebframework.web.i18n.LocaleUtils;
|
||||
|
|
@ -48,6 +49,8 @@ import java.util.*;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.jetlinks.community.rule.engine.scene.SceneRule.DEFAULT_FILTER_TABLE;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerProvider.TriggerConfig, Serializable {
|
||||
|
|
@ -175,37 +178,6 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
|
|||
return CollectionUtils.isEmpty(terms) ? EmptySqlFragments.INSTANCE : termBuilder.createTermFragments(this, terms);
|
||||
}
|
||||
|
||||
Function<Map<String, Object>, Mono<Boolean>> createFilter(List<Term> terms) {
|
||||
SqlFragments fragments = CollectionUtils.isEmpty(terms) ? EmptySqlFragments.INSTANCE : termBuilder.createTermFragments(this, terms);
|
||||
if (!fragments.isEmpty()) {
|
||||
SqlRequest request = fragments.toRequest();
|
||||
String sql = "select 1 from t where " + request.getSql();
|
||||
ReactorQL ql = ReactorQL
|
||||
.builder()
|
||||
.sql(sql)
|
||||
.build();
|
||||
List<Object> args = Arrays.asList(request.getParameters());
|
||||
String sqlString = request.toNativeSql();
|
||||
return new Function<Map<String, Object>, Mono<Boolean>>() {
|
||||
@Override
|
||||
public Mono<Boolean> apply(Map<String, Object> map) {
|
||||
ReactorQLContext context = new DefaultReactorQLContext((t) -> Flux.just(map), args);
|
||||
return ql
|
||||
.start(context)
|
||||
.hasElements();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return sqlString;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return ignore -> Reactors.ALWAYS_TRUE;
|
||||
|
||||
}
|
||||
|
||||
private String createFromTable() {
|
||||
String topic = null;
|
||||
|
||||
|
|
@ -230,12 +202,13 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
|
|||
if (!StringUtils.hasText(selector)) {
|
||||
selector = "all";
|
||||
}
|
||||
String scope;
|
||||
switch (selector) {
|
||||
case "all":
|
||||
topic = String.format(topic, "*");
|
||||
break;
|
||||
case "fixed":
|
||||
String scope = getSelectorValues()
|
||||
scope = getSelectorValues()
|
||||
.stream()
|
||||
.map(SelectorValue::getValue)
|
||||
.map(String::valueOf)
|
||||
|
|
@ -273,7 +246,7 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
|
|||
.lookupSupport(termType)
|
||||
.orElseThrow(() -> new UnsupportedOperationException("unsupported termType " + termType));
|
||||
|
||||
Term copy = refactorTermValue("t", term.clone());
|
||||
Term copy = refactorTermValue(DEFAULT_FILTER_TABLE, term.clone());
|
||||
|
||||
return support.createSql(copy.getColumn(), copy.getValue(), term);
|
||||
}
|
||||
|
|
@ -330,30 +303,42 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
if (!term.getOptions().contains(TermType.OPTIONS_NATIVE_SQL)) {
|
||||
String column;
|
||||
// properties.xxx.last的场景
|
||||
if (arr.length > 3 && arr[0].equals("properties")) {
|
||||
column = tableName + "['" + createColumnAlias(term.getColumn(), false) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
|
||||
} else if (!isBranchTerm(arr[0])) {
|
||||
column = tableName + "['" + createColumnAlias(term.getColumn(), false) + "']";
|
||||
} else {
|
||||
column = term.getColumn();
|
||||
}
|
||||
term.setColumn(column);
|
||||
String column;
|
||||
// properties.xxx.last的场景
|
||||
if (arr.length > 3 && arr[0].equals("properties")) {
|
||||
column = tableName + "['" + createColumnAlias(term.getColumn(), false) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
|
||||
} else if (!isDirectTerm(arr[0])) {
|
||||
column = tableName + "['" + createColumnAlias(term.getColumn(), false) + "']";
|
||||
} else {
|
||||
column = term.getColumn();
|
||||
}
|
||||
|
||||
if (term.getOptions().contains(TermType.OPTIONS_NATIVE_SQL)) {
|
||||
val = NativeSql.of(String.valueOf(val));
|
||||
}
|
||||
|
||||
term.setColumn(column);
|
||||
|
||||
term.setValue(val);
|
||||
|
||||
return term;
|
||||
}
|
||||
|
||||
private static boolean isDirectTerm(String column) {
|
||||
//直接term,构建Condition输出条件时使用
|
||||
return isBranchTerm(column) || isSceneTerm(column);
|
||||
}
|
||||
|
||||
private static boolean isBranchTerm(String column) {
|
||||
return column.startsWith("branch_") &&
|
||||
column.contains("_group_")
|
||||
&& column.contains("_action_");
|
||||
}
|
||||
|
||||
private static boolean isSceneTerm(String column) {
|
||||
return column.startsWith("scene");
|
||||
}
|
||||
|
||||
static String parseProperty(String column) {
|
||||
String[] arr = column.split("[.]");
|
||||
|
||||
|
|
@ -377,9 +362,9 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
|
|||
case current:
|
||||
return "this['properties." + property + "']";
|
||||
case recent:
|
||||
return "coalesce(this['properties." + property + "']" + ",device.property.recent(deviceId,'" + property + "',timestamp))";
|
||||
return "coalesce(this['properties." + property + "']" + ",device.property.recent(deviceId,'" + property + "',timestamp - 1))";
|
||||
case last:
|
||||
return "device.property.recent(deviceId,'" + property + "',timestamp)";
|
||||
return "device.property.recent(deviceId,'" + property + "',timestamp - 1)";
|
||||
}
|
||||
} catch (IllegalArgumentException ignore) {
|
||||
|
||||
|
|
@ -389,35 +374,13 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
|
|||
}
|
||||
|
||||
static String createColumnAlias(String column, boolean wrapColumn) {
|
||||
if (!column.contains(".")) {
|
||||
return wrapColumn ? wrapColumnName(column) : column;
|
||||
}
|
||||
String[] arr = column.split("[.]");
|
||||
String alias;
|
||||
//properties.temp.current
|
||||
if ("properties".equals(arr[0])) {
|
||||
String property = arr[1];
|
||||
alias = property + "_" + arr[arr.length - 1];
|
||||
} else {
|
||||
if (arr.length > 1) {
|
||||
alias = String.join("_", Arrays.copyOfRange(arr, 1, arr.length));
|
||||
} else {
|
||||
alias = column.replace(".", "_");
|
||||
}
|
||||
}
|
||||
return wrapColumn ? wrapColumnName(alias) : alias;
|
||||
return SceneUtils.createColumnAlias("properties", column, wrapColumn);
|
||||
}
|
||||
|
||||
static String createColumnAlias(String column) {
|
||||
return createColumnAlias(column, true);
|
||||
}
|
||||
|
||||
static String wrapColumnName(String column) {
|
||||
if (column.startsWith("\"") && column.endsWith("\"")) {
|
||||
return column;
|
||||
}
|
||||
return "\"" + (column.replace("\"", "\\\"")) + "\"";
|
||||
}
|
||||
|
||||
public List<Variable> createDefaultVariable() {
|
||||
return Arrays.asList(
|
||||
|
|
@ -518,5 +481,4 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
|
|||
operation.validate();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue