Compare commits

...

2 Commits

Author SHA1 Message Date
zhouhao 419854fdc4 refactor: 优化提示 2025-05-23 10:44:30 +08:00
zhouhao ee478933ab feat: 增加插件功能支持 2025-05-22 11:41:36 +08:00
55 changed files with 3745 additions and 2 deletions

View File

@ -28,7 +28,8 @@ public interface DataReferenceManager {
String TYPE_RELATION = "relation";
//数据类型消息协议
String TYPE_PROTOCOL = "protocol";
//数据类型插件
String TYPE_PLUGIN = "plugin";
/**
* 判断指定数据类型的数据是否已经被其他地方所引用
*

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.gateway;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.core.Wrapper;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.Transport;
import reactor.core.Disposable;
@ -16,7 +17,7 @@ import java.util.function.BiConsumer;
* @version 1.0
* @since 1.0
*/
public interface DeviceGateway {
public interface DeviceGateway extends Wrapper {
/**
* @return 网关ID

View File

@ -49,6 +49,16 @@ public interface DeviceGatewayManager {
*/
Mono<Void> start(String id);
/**
* 重新加载当前节点的网关
*
* @param gatewayId 网关ID
* @return void
* @since 2.2
*/
Mono<Void> reloadLocal(String gatewayId);
/**
* 获取接入网关通道信息,通道中包含接入地址等信息
*

View File

@ -82,6 +82,11 @@ public class DefaultDeviceGatewayManager implements DeviceGatewayManager {
.doStart(gatewayId);
}
@Override
public Mono<Void> reloadLocal(String gatewayId) {
return this.doReload(gatewayId);
}
@Override
public Mono<DeviceGateway> getGateway(String id) {
return doGetGateway(id);

View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-components</artifactId>
<version>2.10.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>plugin-component</artifactId>
<dependencies>
<dependency>
<groupId>org.jetlinks.plugin</groupId>
<artifactId>plugin-internal</artifactId>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>gateway-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>things-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>io-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>script-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
package org.jetlinks.community.plugin;
import org.jetlinks.core.lang.SeparatedCharSequence;
import org.jetlinks.core.lang.SharedPathString;
import org.jetlinks.core.utils.StringBuilderUtils;
public interface PluginConstants {
/**
* 创建用于访问插件命令的服务ID,可通过{@link org.jetlinks.community.command.CommandSupportManagerProviders#getCommandSupport(String)}来获取插件命令方法支持.
*
* @param pluginId 插件ID
* @return 访问插件的服务ID
* @see org.jetlinks.community.command.CommandSupportManagerProvider
*/
static String createCommandServiceId(String pluginId) {
return "plugin$" + pluginId;
}
interface Topics {
SharedPathString ALL_PLUGIN_LOG = SharedPathString.of("/plugin/*/log/*");
/**
* <code> /plugin/{pid}/log</code>
*
* @param pluginId 插件ID
* @return topic
* @see org.jetlinks.core.event.EventBus
* @see org.jetlinks.community.log.LogRecord
*/
static SeparatedCharSequence pluginLog(String pluginId, String level) {
return ALL_PLUGIN_LOG.replace(2, pluginId, 4, level);
}
}
}

View File

@ -0,0 +1,20 @@
package org.jetlinks.community.plugin;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.ValueObject;
import java.util.Map;
@Getter
@Setter
public class PluginDriverConfig implements ValueObject {
private String id;
private String provider;
private Map<String,Object> configuration;
@Override
public Map<String, Object> values() {
return configuration;
}
}

View File

@ -0,0 +1,41 @@
package org.jetlinks.community.plugin;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.community.plugin.impl.PluginDriverInstallerProvider;
import reactor.core.publisher.Mono;
/**
* 插件驱动安装器,用于根据查看驱动配置安装,卸载插件驱动
*
* @author zhouhao
* @see PluginDriverInstallerProvider
* @since 2.0
*/
public interface PluginDriverInstaller {
/**
* 安装驱动
*
* @param config 驱动配置
* @return 驱动
*/
Mono<PluginDriver> install(PluginDriverConfig config);
/**
* 重新加载驱动
* @param driver 旧驱动
* @param config 驱动配置
* @return 驱动
*/
Mono<PluginDriver> reload(PluginDriver driver,
PluginDriverConfig config);
/**
* 卸载驱动
*
* @param config 驱动配置
* @return void
*/
Mono<Void> uninstall(PluginDriverConfig config);
}

View File

@ -0,0 +1,61 @@
package org.jetlinks.community.plugin;
import org.jetlinks.plugin.core.PluginDriver;
import reactor.core.publisher.Mono;
/**
* 插件驱动监听器
*
* @author zhouhao
* @since 2.1
*/
public interface PluginDriverListener {
/**
* 当插件安装时触发
*
* @param driverId 驱动ID
* @param driver 驱动
* @return void
*/
default Mono<Void> onInstall(String driverId,
PluginDriver driver) {
return Mono.empty();
}
/**
* 当插件重新加载时触发
*
* @param driverId 驱动ID
* @param driver 驱动
* @return void
*/
default Mono<Void> onReload(String driverId,
PluginDriver oldDriver,
PluginDriver driver) {
return Mono.empty();
}
/**
* 当插件卸载时触发
*
* @param driverId 驱动ID
* @param driver 驱动
* @return void
*/
default Mono<Void> onUninstall(String driverId,
PluginDriver driver) {
return Mono.empty();
}
/**
* 当插件管理器启动完成时触发
*
* @return void
*/
default Mono<Void> onStartup() {
return Mono.empty();
}
}

View File

@ -0,0 +1,38 @@
package org.jetlinks.community.plugin;
import org.jetlinks.plugin.core.PluginDriver;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 插件驱动管理器
*
* @author zhouhao
* @since 2.0
*/
public interface PluginDriverManager {
/**
* 获取全部已加载的驱动信息
*
* @return 驱动信息
*/
Flux<PluginDriver> getDrivers();
/**
* 根据ID获取驱动信息
*
* @param id ID
* @return 驱动信息
*/
Mono<PluginDriver> getDriver(String id);
/**
* 监听驱动相关事件,可通过返回值{@link Disposable#dispose()} 来取消监听
*
* @param listener 监听器
* @return Disposable
*/
Disposable listen(PluginDriverListener listener);
}

View File

@ -0,0 +1,146 @@
package org.jetlinks.community.plugin.configuration;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.annotation.EnableEasyormRepository;
import org.jetlinks.community.plugin.impl.*;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.plugin.core.PluginRegistry;
import org.jetlinks.plugin.core.ServiceRegistry;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import org.jetlinks.plugin.internal.device.PluginDeviceGatewayService;
import org.jetlinks.plugin.internal.device.PluginDeviceManager;
import org.jetlinks.community.gateway.DeviceGatewayManager;
import org.jetlinks.community.plugin.PluginDriverInstaller;
import org.jetlinks.community.plugin.PluginDriverManager;
import org.jetlinks.community.plugin.context.SpringServiceRegistry;
import org.jetlinks.community.plugin.device.DefaultPluginDeviceManager;
import org.jetlinks.community.plugin.device.PluginDeviceGatewayProvider;
import org.jetlinks.community.plugin.device.PluginDeviceGatewayServiceImpl;
import org.jetlinks.community.plugin.impl.id.DefaultPluginDataIdMapper;
import org.jetlinks.community.plugin.impl.id.PluginDataIdMappingEntity;
import org.jetlinks.community.plugin.impl.jar.JarPluginDriverInstallerProvider;
import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.web.reactive.function.client.WebClient;
@AutoConfiguration
@EnableEasyormRepository(
{
"org.jetlinks.community.plugin.impl.PluginDriverEntity",
"org.jetlinks.community.plugin.impl.id.PluginDataIdMappingEntity",
"org.jetlinks.community.plugin.impl.standalone.StandalonePluginEntity"
})
public class PluginAutoConfiguration {
@Bean
public DefaultPluginDataIdMapper pluginDataIdMapper(ReactiveRepository<PluginDataIdMappingEntity, String> repository,
ConfigStorageManager storageManager) {
return new DefaultPluginDataIdMapper(repository, storageManager);
}
@Bean
public JarPluginDriverInstallerProvider jarPluginDriverLoaderProvider(WebClient.Builder builder) {
return new JarPluginDriverInstallerProvider(builder);
}
@Bean
@Primary
public DefaultPluginDriverInstaller pluginDriverLoader(ObjectProvider<PluginDriverInstallerProvider> loaders) {
DefaultPluginDriverInstaller loader = new DefaultPluginDriverInstaller();
loaders.forEach(loader::addProvider);
return loader;
}
@Bean
public SpringServiceRegistry springServiceRegistry(ApplicationContext context) {
return new SpringServiceRegistry(context);
}
@Bean
public DefaultPluginDriverManager pluginDriverManager(ReactiveRepository<PluginDriverEntity, String> repository,
PluginDriverInstaller loader) {
return new DefaultPluginDriverManager(repository, loader);
}
@Bean
@ConditionalOnMissingBean(PluginRegistry.class)
public NonePluginRegistry nonePluginRegistry() {
return new NonePluginRegistry();
}
@Bean
public PluginI18nMessageSource pluginI18nMessageSource(PluginDriverManager driverManager) {
PluginI18nMessageSource messageSource = new PluginI18nMessageSource();
driverManager.listen(messageSource);
return messageSource;
}
@AutoConfiguration
@ConditionalOnProperty(prefix = "jetlinks.device.gateway.plugin", value = "enabled", havingValue = "true", matchIfMissing = true)
static class DevicePluginAutoConfiguration {
@Bean
@ConditionalOnBean(DeviceGatewayManager.class)
public PluginDriverHandler pluginDriverHandler(DeviceGatewayManager deviceGatewayManager,
DataReferenceManager referenceManager,
PluginDriverManager driverManager) {
return new PluginDriverHandler(deviceGatewayManager, referenceManager, driverManager);
}
@Bean
@ConditionalOnBean({
DeviceSessionManager.class,
DecodedClientMessageHandler.class
})
@ConditionalOnMissingBean(PluginDeviceGatewayService.class)
public PluginDeviceGatewayServiceImpl pluginDeviceGatewayService(
DeviceRegistry registry,
DeviceSessionManager sessionManager,
DecodedClientMessageHandler handler,
PluginDataIdMapper dataIdMapper) {
return new PluginDeviceGatewayServiceImpl(registry, sessionManager, handler, dataIdMapper);
}
@Bean
@ConditionalOnBean(PluginDeviceGatewayService.class)
public PluginDeviceGatewayProvider pluginDeviceGatewayProvider(PluginRegistry registry,
PluginDriverManager driverManager,
ServiceRegistry serviceRegistry,
PluginDataIdMapper idMapper,
DeviceRegistry deviceRegistry,
PluginDeviceGatewayService gatewayService,
EventBus eventBus) {
return new PluginDeviceGatewayProvider(registry,
driverManager,
serviceRegistry,
idMapper,
deviceRegistry,
gatewayService,
eventBus);
}
@Bean
@ConditionalOnBean(DeviceRegistry.class)
@ConditionalOnMissingBean(PluginDeviceManager.class)
public DefaultPluginDeviceManager pluginDeviceManager(DeviceRegistry registry,
PluginDataIdMapper idMapper) {
return new DefaultPluginDeviceManager(registry, idMapper);
}
}
}

View File

@ -0,0 +1,136 @@
package org.jetlinks.community.plugin.context;
import io.opentelemetry.api.common.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.monitor.Monitor;
import org.jetlinks.plugin.core.PluginScheduler;
import org.jetlinks.community.TimerSpec;
import org.jetlinks.community.utils.ReactorUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 支持集群的插件调度器
*
* @author zhouhao
* @since 2.0
*/
@Slf4j
public class ClusterPluginScheduler implements PluginScheduler, Disposable {
private final Map<String, Disposable> jobs = new ConcurrentHashMap<>();
static final AttributeKey<String> JOB_NAME = AttributeKey.stringKey("jobName");
private final String pluginId;
private final Monitor monitor;
public ClusterPluginScheduler(String pluginId) {
this(pluginId, Monitor.noop());
}
public ClusterPluginScheduler(String pluginId, Monitor monitor) {
this.pluginId = pluginId;
this.monitor = monitor;
}
private String createJobName(String name) {
return "plugin:" + pluginId + ":" + name;
}
@Override
public Disposable interval(String name, Mono<Void> job, String cronExpression, boolean singleton) {
Mono<Void> wrap = wrapJob(name, job);
// 🌟企业版支持集群调度.
Flux<Long> timer = TimerSpec.cron(cronExpression).flux();
Disposable timerJob = timer
.onBackpressureDrop(num -> monitor
.logger()
.warn("execute cron [{}] job [{}] dropped", cronExpression, name))
.concatMap(ignore -> {
monitor
.logger()
.debug("execute cron [{}] job [{}]", cronExpression, name);
return wrap;
})
.subscribe();
ReactorUtils.dispose(jobs.put(name, timerJob));
return () -> {
monitor
.logger()
.debug("stop cron [{}] job [{}]", cronExpression, name);
jobs.remove(name, timerJob);
timerJob.dispose();
};
}
private Mono<Void> wrapJob(String name, Mono<Void> job) {
return job
.as(monitor
.tracer()
.traceMono("/interval",
(contextView, spanBuilder) -> spanBuilder.setAttribute(JOB_NAME, name)))
.onErrorResume(err -> {
monitor
.logger()
.warn("execute job [{}] error", name, err);
return Mono.empty();
});
}
@Override
public Disposable interval(String name, Mono<Void> job, Duration interval, boolean singleton) {
Mono<Void> wrap = wrapJob(name, job);
Flux<Long> timer = Flux.interval(interval);
Disposable timerJob = timer
.onBackpressureDrop(num -> {
monitor
.logger()
.warn("interval [{}] job [{}] dropped!", interval, name);
})
.concatMap(ignore -> {
monitor
.logger()
.debug("execute interval [{}] job [{}]", interval, name);
return wrap;
})
.subscribe();
jobs.put(name, timerJob);
return () -> {
jobs.remove(name, timerJob);
timerJob.dispose();
};
}
@Override
public void cancel(String name) {
ReactorUtils.dispose(jobs.remove(name));
}
@Override
public Disposable delay(Mono<Void> mono, Duration duration) {
return Mono
.delay(duration)
.then(mono)
.subscribe();
}
@Override
public void dispose() {
jobs.values().forEach(Disposable::dispose);
}
}

View File

@ -0,0 +1,51 @@
package org.jetlinks.community.plugin.context;
import org.jetlinks.core.command.AsyncProxyCommandSupport;
import org.jetlinks.core.command.CommandSupport;
import org.jetlinks.plugin.core.ServiceRegistry;
import org.jetlinks.plugin.internal.functional.FunctionalService;
import org.jetlinks.community.command.CommandSupportManagerProvider;
import org.jetlinks.community.command.CommandSupportManagerProviders;
import reactor.core.publisher.Mono;
import java.util.*;
public class CommandServiceRegistry implements ServiceRegistry {
public static final CommandServiceRegistry INSTANCE = new CommandServiceRegistry();
public static ServiceRegistry instance() {
return INSTANCE;
}
@Override
public <T> Optional<T> getService(Class<T> type) {
return Optional.empty();
}
@Override
@SuppressWarnings("all")
public <T> Optional<T> getService(Class<T> type, String name) {
//todo 限制可访问的命令服务?
if ((type == FunctionalService.class || type == CommandSupport.class)) {
return Optional.of((T) new CommandFunctionalServiceIml(
CommandSupportManagerProviders
.getCommandSupport(name, Collections.emptyMap())));
}
return Optional.empty();
}
@Override
public <T> List<T> getServices(Class<T> type) {
return Collections.emptyList();
}
static class CommandFunctionalServiceIml extends AsyncProxyCommandSupport implements FunctionalService {
public CommandFunctionalServiceIml(Mono<CommandSupport> asyncCommand) {
super(asyncCommand);
}
}
}

View File

@ -0,0 +1,47 @@
package org.jetlinks.community.plugin.context;
import lombok.AllArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.jetlinks.plugin.core.ServiceRegistry;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@AllArgsConstructor
public class CompositeServiceRegistry implements ServiceRegistry {
private final List<ServiceRegistry> registries;
@Override
public <T> Optional<T> getService(Class<T> type) {
for (ServiceRegistry registry : registries) {
Optional<T> service = registry.getService(type);
if (service.isPresent()) {
return service;
}
}
return Optional.empty();
}
@Override
public <T> Optional<T> getService(Class<T> type, String name) {
for (ServiceRegistry registry : registries) {
Optional<T> service = registry.getService(type, name);
if (service.isPresent()) {
return service;
}
}
return Optional.empty();
}
@Override
public <T> List<T> getServices(Class<T> type) {
for (ServiceRegistry registry : registries) {
List<T> service = registry.getServices(type);
if (CollectionUtils.isNotEmpty(service)) {
return service;
}
}
return Collections.emptyList();
}
}

View File

@ -0,0 +1,88 @@
package org.jetlinks.community.plugin.context;
import org.jetlinks.core.monitor.Monitor;
import org.jetlinks.plugin.core.*;
import java.io.File;
public class DefaultPluginContext implements PluginContext {
private final PluginRegistry pluginRegistry;
private final ServiceRegistry serviceRegistry;
private final PluginEnvironment environment;
private final PluginMetrics metrics;
private final PluginScheduler scheduler;
private final File workDir;
private DefaultPluginContext(PluginRegistry pluginRegistry,
ServiceRegistry serviceRegistry,
PluginEnvironment environment,
PluginMetrics metrics,
PluginScheduler scheduler,
File workDir) {
this.pluginRegistry = pluginRegistry;
this.serviceRegistry = serviceRegistry;
this.environment = environment;
this.metrics = metrics;
this.scheduler = scheduler;
this.workDir = workDir;
}
@Deprecated
public static DefaultPluginContext of(PluginRegistry pluginRegistry,
ServiceRegistry serviceRegistry,
PluginEnvironment environment,
PluginMetrics metrics,
PluginScheduler scheduler,
File workDir) {
return new DefaultPluginContext(pluginRegistry, serviceRegistry, environment, metrics, scheduler, workDir);
}
public static DefaultPluginContext of(PluginRegistry pluginRegistry,
ServiceRegistry serviceRegistry,
PluginEnvironment environment,
Monitor monitor,
PluginScheduler scheduler,
File workDir) {
return new DefaultPluginContext(pluginRegistry,
serviceRegistry,
environment,
MonitorPluginMetrics.of(monitor),
scheduler,
workDir);
}
@Override
public PluginRegistry registry() {
return pluginRegistry;
}
@Override
public ServiceRegistry services() {
return serviceRegistry;
}
@Override
public PluginEnvironment environment() {
return environment;
}
@Override
public PluginMetrics metrics() {
return metrics;
}
@Override
public Monitor monitor() {
return metrics;
}
@Override
public PluginScheduler scheduler() {
return scheduler;
}
@Override
public File workDir() {
return workDir;
}
}

View File

@ -0,0 +1,26 @@
package org.jetlinks.community.plugin.context;
import lombok.AllArgsConstructor;
import org.jetlinks.core.monitor.Monitor;
import org.jetlinks.core.monitor.logger.Logger;
import org.jetlinks.core.monitor.metrics.Metrics;
import org.jetlinks.core.monitor.tracer.Tracer;
import org.jetlinks.plugin.core.PluginMetrics;
@AllArgsConstructor(staticName = "of")
class MonitorPluginMetrics implements PluginMetrics {
private final Monitor monitor;
@Override
public Logger logger() {
return monitor.logger();
}
@Override
public Tracer tracer() {
return monitor.tracer();
}
@Override
public Metrics metrics() {
return monitor.metrics();
}
}

View File

@ -0,0 +1,29 @@
package org.jetlinks.community.plugin.context;
import org.jetlinks.core.monitor.logger.Logger;
import org.jetlinks.core.monitor.metrics.Metrics;
import org.jetlinks.core.monitor.tracer.Tracer;
import org.jetlinks.plugin.core.PluginMetrics;
public class NonePluginMetrics implements PluginMetrics {
public static final NonePluginMetrics INSTANCE = new NonePluginMetrics();
private NonePluginMetrics() {
}
@Override
public Logger logger() {
return Logger.noop();
}
@Override
public Tracer tracer() {
return Tracer.noop();
}
@Override
public Metrics metrics() {
return Metrics.noop();
}
}

View File

@ -0,0 +1,11 @@
package org.jetlinks.community.plugin.context;
import org.jetlinks.plugin.core.PluginMetrics;
@Deprecated
public interface PluginMetricsSupport {
static PluginMetrics create(String pluginId){
return NonePluginMetrics.INSTANCE;
}
}

View File

@ -0,0 +1,35 @@
package org.jetlinks.community.plugin.context;
import org.jetlinks.plugin.core.PluginEnvironment;
import org.jetlinks.community.ValueObject;
import java.util.Map;
import java.util.Optional;
public class SimplePluginEnvironment implements PluginEnvironment {
private final ValueObject properties;
public SimplePluginEnvironment(Map<String, Object> properties) {
this(ValueObject.of(properties));
}
public SimplePluginEnvironment(ValueObject properties) {
this.properties = properties;
}
@Override
public Optional<String> getProperty(String key) {
return properties.getString(key);
}
@Override
public <T> Optional<T> getProperty(String key, Class<T> type) {
return properties.get(key, type);
}
@Override
public Map<String, Object> getProperties() {
return properties.values();
}
}

View File

@ -0,0 +1,41 @@
package org.jetlinks.community.plugin.context;
import lombok.AllArgsConstructor;
import org.jetlinks.plugin.core.ServiceRegistry;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@AllArgsConstructor
public class SingleServiceRegistry implements ServiceRegistry {
private final String name;
private final Object service;
@Override
public <T> Optional<T> getService(Class<T> type) {
if (type.isInstance(service)) {
return Optional.of(type.cast(service));
}
return Optional.empty();
}
@Override
public <T> Optional<T> getService(Class<T> type, String name) {
if (Objects.equals(name, this.name)) {
return getService(type);
}
return Optional.empty();
}
@Override
public <T> List<T> getServices(Class<T> type) {
if (type.isInstance(service)) {
return Collections.singletonList(type.cast(service));
}
return Collections.emptyList();
}
}

View File

@ -0,0 +1,48 @@
package org.jetlinks.community.plugin.context;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.plugin.core.ServiceRegistry;
import org.springframework.context.ApplicationContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@AllArgsConstructor
@Slf4j
public class SpringServiceRegistry implements ServiceRegistry {
private final ApplicationContext context;
@Override
public <T> Optional<T> getService(Class<T> aClass) {
try {
return Optional.of(context.getBean(aClass));
} catch (Throwable error) {
log.warn("get spring service [{}] error", aClass, error);
return Optional.empty();
}
}
@Override
public <T> Optional<T> getService(Class<T> aClass, String s) {
try {
return Optional.of(context.getBean(s, aClass));
} catch (Throwable error) {
log.warn("get spring service [{}][{}] error", s, aClass, error);
return Optional.empty();
}
}
@Override
public <T> List<T> getServices(Class<T> aClass) {
try {
return new ArrayList<>(context.getBeansOfType(aClass).values());
} catch (Throwable error) {
log.warn("get spring services [{}] error", aClass, error);
return Collections.emptyList();
}
}
}

View File

@ -0,0 +1,75 @@
package org.jetlinks.community.plugin.device;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin;
import org.jetlinks.plugin.internal.device.PluginDeviceManager;
import org.jetlinks.community.command.CommandSupportManagerProviders;
import org.jetlinks.community.plugin.utils.PluginUtils;
import org.jetlinks.sdk.server.SdkServices;
import org.jetlinks.sdk.server.commons.cmd.QueryListCommand;
import org.jetlinks.sdk.server.device.DeviceCommandSupportTypes;
import org.jetlinks.sdk.server.device.DeviceInfo;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.Collections;
import java.util.function.Consumer;
@AllArgsConstructor
public class DefaultPluginDeviceManager implements PluginDeviceManager {
private final DeviceRegistry deviceRegistry;
private final PluginDataIdMapper dataIdMapper;
@Override
public Flux<DeviceOperator> getDevices(DeviceGatewayPlugin plugin) {
return doQuery(q -> applyQuery(plugin, q))
.flatMap(e -> deviceRegistry
.getDevice(e.getId())
.flatMap(device -> PluginUtils.transformToExternalDevice(dataIdMapper, plugin, device))
.subscribeOn(Schedulers.parallel()),
32);
}
@Override
public Flux<DeviceOperator> getDevices(DeviceGatewayPlugin plugin, String productId) {
return dataIdMapper
.getInternalId(PluginDataIdMapper.TYPE_PRODUCT, plugin.getId(), productId)
.flatMapMany(internalId -> doQuery(q -> applyQuery(plugin, q.and("productId", internalId))))
.flatMap(info -> deviceRegistry
.getDevice(info.getId())
.flatMap(device -> PluginUtils.transformToExternalDevice(dataIdMapper, plugin, device))
.subscribeOn(Schedulers.parallel()))
;
}
private Flux<DeviceInfo> doQuery(Consumer<Query<?, QueryParamEntity>> customizer) {
return CommandSupportManagerProviders
.getProviderNow(SdkServices.deviceService)
.getCommandSupport(DeviceCommandSupportTypes.device, Collections.emptyMap())
.flatMapMany(cmd -> cmd.execute(
QueryListCommand
.of(DeviceInfo.class)
.dsl(customizer)));
}
private void applyQuery(DeviceGatewayPlugin plugin,
Query<?, QueryParamEntity> query) {
query
.select("id")
.accept("productId",
"product-info",
QueryParamEntity
.of()
.toQuery()
.is("accessId", plugin.getId())
.getParam()
.getTerms());
}
}

View File

@ -0,0 +1,221 @@
package org.jetlinks.community.plugin.device;
import lombok.AllArgsConstructor;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.Value;
import org.jetlinks.core.Values;
import org.jetlinks.core.device.*;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.Map;
@AllArgsConstructor
public class ExternalDeviceOperator implements DeviceOperator {
private final String externalId;
private final String pluginId;
private final PluginDataIdMapper idMapper;
private final DeviceOperator internal;
@Override
@SuppressWarnings("all")
public <T> T unwrap(Class<T> type) {
if (type == ExternalDeviceOperator.class) {
return (T) type;
}
return internal.unwrap(type);
}
@Override
public boolean isWrapperFor(Class<?> type) {
return internal.isWrapperFor(type) || type == ExternalDeviceOperator.class;
}
@Override
public String getId() {
return externalId;
}
@Override
public String getDeviceId() {
return externalId;
}
@Override
public Mono<String> getConnectionServerId() {
return internal.getConnectionServerId();
}
@Override
public Mono<String> getSessionId() {
return internal.getSessionId();
}
@Override
public Mono<String> getAddress() {
return internal.getAddress();
}
@Override
public Mono<Void> setAddress(String address) {
return internal.setAddress(address);
}
@Override
public Mono<Boolean> putState(byte state) {
return internal.putState(state);
}
@Override
public Mono<Byte> getState() {
return internal.getState();
}
@Override
public Mono<Byte> checkState() {
return internal.checkState();
}
@Override
public Mono<Long> getOnlineTime() {
return internal.getOnlineTime();
}
@Override
public Mono<Long> getOfflineTime() {
return internal.getOfflineTime();
}
@Override
public Mono<Boolean> online(String serverId, String sessionId, String address) {
return internal.online(serverId, sessionId, address);
}
@Override
public Mono<Boolean> online(String serverId, String address, long onlineTime) {
return internal.online(serverId, address, onlineTime);
}
@Override
public Mono<Value> getSelfConfig(String key) {
return internal.getSelfConfig(key);
}
@Override
public Mono<Values> getSelfConfigs(Collection<String> keys) {
return internal.getSelfConfigs(keys);
}
@Override
public Mono<Boolean> offline() {
return internal.offline();
}
@Override
public Mono<Boolean> disconnect() {
return internal.disconnect();
}
@Override
public Mono<AuthenticationResponse> authenticate(AuthenticationRequest request) {
return internal.authenticate(request);
}
@Override
public Mono<DeviceMetadata> getMetadata() {
return internal.getMetadata();
}
@Override
public Mono<ProtocolSupport> getProtocol() {
return internal.getProtocol();
}
@Override
public DeviceMessageSender messageSender() {
return internal.messageSender();
}
@Override
public Mono<Boolean> updateMetadata(String metadata) {
return internal.updateMetadata(metadata);
}
@Override
public Mono<Boolean> updateMetadata(ThingMetadata metadata) {
return internal.updateMetadata(metadata);
}
@Override
public Mono<Void> resetMetadata() {
return internal.resetMetadata();
}
@Override
public Mono<DeviceProductOperator> getProduct() {
return internal
.getProduct()
.flatMap(opt -> idMapper
.getExternalId(PluginDataIdMapper.TYPE_PRODUCT, pluginId, opt.getId())
.map(ext -> new ExternalDeviceProductOperator(ext, opt)));
}
@Override
public Mono<DeviceOperator> getParentDevice() {
return internal
.getParentDevice()
.flatMap(device -> idMapper
.getExternalId(PluginDataIdMapper.TYPE_DEVICE, pluginId, device.getId())
.map(ext -> new ExternalDeviceOperator(ext, pluginId, idMapper, device)));
}
@Override
public Mono<Value> getConfig(String key) {
return internal.getConfig(key);
}
@Override
public Mono<Values> getConfigs(Collection<String> keys) {
return internal.getConfigs(keys);
}
@Override
public Mono<Boolean> setConfig(String key, Object value) {
return internal.setConfig(key, value);
}
@Override
public Mono<Boolean> setConfigs(Map<String, Object> conf) {
return internal.setConfigs(conf);
}
@Override
public Mono<Boolean> removeConfig(String key) {
return internal.removeConfig(key);
}
@Override
public Mono<Value> getAndRemoveConfig(String key) {
return internal.getAndRemoveConfig(key);
}
@Override
public Mono<Boolean> removeConfigs(Collection<String> key) {
return internal.removeConfigs(key);
}
@Override
public Mono<Void> refreshConfig(Collection<String> keys) {
return internal.refreshConfig(keys);
}
@Override
public Mono<Void> refreshAllConfig() {
return internal.refreshAllConfig();
}
}

View File

@ -0,0 +1,99 @@
package org.jetlinks.community.plugin.device;
import lombok.AllArgsConstructor;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.Value;
import org.jetlinks.core.Values;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.things.ThingMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.Map;
@AllArgsConstructor
public class ExternalDeviceProductOperator implements DeviceProductOperator {
private final String externalId;
private final DeviceProductOperator internal;
@Override
public String getId() {
return externalId;
}
@Override
public Mono<DeviceMetadata> getMetadata() {
//todo 自动转换映射
return internal.getMetadata();
}
@Override
public Mono<Boolean> updateMetadata(String metadata) {
return internal.updateMetadata(metadata);
}
@Override
public Mono<Boolean> updateMetadata(ThingMetadata metadata) {
return internal.updateMetadata(metadata);
}
@Override
public Mono<ProtocolSupport> getProtocol() {
return internal.getProtocol();
}
@Override
public Flux<DeviceOperator> getDevices() {
return internal.getDevices();
}
@Override
public Mono<Value> getConfig(String key) {
return internal.getConfig(key);
}
@Override
public Mono<Values> getConfigs(Collection<String> keys) {
return internal.getConfigs(keys);
}
@Override
public Mono<Boolean> setConfig(String key, Object value) {
return internal.setConfig(key,value);
}
@Override
public Mono<Boolean> setConfigs(Map<String, Object> conf) {
return internal.setConfigs(conf);
}
@Override
public Mono<Boolean> removeConfig(String key) {
return internal.removeConfig(key);
}
@Override
public Mono<Value> getAndRemoveConfig(String key) {
return internal.getAndRemoveConfig(key);
}
@Override
public Mono<Boolean> removeConfigs(Collection<String> key) {
return internal.removeConfigs(key);
}
@Override
public Mono<Void> refreshConfig(Collection<String> keys) {
return internal.refreshConfig(keys);
}
@Override
public Mono<Void> refreshAllConfig() {
return internal.refreshAllConfig();
}
}

View File

@ -0,0 +1,52 @@
package org.jetlinks.community.plugin.device;
import lombok.AllArgsConstructor;
import org.jetlinks.core.device.*;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import reactor.core.publisher.Mono;
@AllArgsConstructor
public class ExternalDeviceRegistry implements DeviceRegistry {
private final String pluginId;
private final PluginDataIdMapper idMapper;
private final DeviceRegistry internal;
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return idMapper
.getInternalId(PluginDataIdMapper.TYPE_DEVICE, pluginId, deviceId)
.flatMap(internal::getDevice)
.map(opt -> new ExternalDeviceOperator(deviceId, pluginId, idMapper, opt));
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return idMapper
.getInternalId(PluginDataIdMapper.TYPE_PRODUCT, pluginId, productId)
.flatMap(internal::getProduct)
.map(opt -> new ExternalDeviceProductOperator(productId, opt));
}
@Override
public Mono<DeviceOperator> register(DeviceInfo deviceInfo) {
return Mono.error(new UnsupportedOperationException());
}
@Override
public Mono<DeviceProductOperator> register(ProductInfo productInfo) {
return Mono.error(new UnsupportedOperationException());
}
@Override
public Mono<Void> unregisterDevice(String deviceId) {
return Mono.error(new UnsupportedOperationException());
}
@Override
public Mono<Void> unregisterProduct(String productId) {
return Mono.error(new UnsupportedOperationException());
}
}

View File

@ -0,0 +1,64 @@
package org.jetlinks.community.plugin.device;
import org.jetlinks.core.command.Command;
import org.jetlinks.core.command.CommandSupport;
import org.jetlinks.core.command.ProxyCommandSupport;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
public class PluginDeviceGateway extends AbstractDeviceGateway implements ProxyCommandSupport {
private final DeviceGatewayPlugin plugin;
public PluginDeviceGateway(String id, DeviceGatewayPlugin plugin) {
super(id);
this.plugin = plugin;
}
@Override
public CommandSupport getProxyTarget() {
return plugin;
}
@Override
protected Mono<Void> doShutdown() {
return plugin.shutdown();
}
@Override
protected Mono<Void> doStartup() {
return plugin.start();
}
public DeviceGatewayPlugin getPlugin() {
return plugin;
}
@Nonnull
@Override
public <R> R execute(@Nonnull Command<R> command) {
return plugin.execute(command);
}
@Override
public <R, C extends Command<R>> C createCommand(String commandId) {
return plugin.createCommand(commandId);
}
@Override
public Flux<FunctionMetadata> getCommandMetadata() {
return plugin.getCommandMetadata();
}
@Override
public Mono<FunctionMetadata> getCommandMetadata(String commandId) {
return plugin.getCommandMetadata(commandId);
}
}

View File

@ -0,0 +1,300 @@
package org.jetlinks.community.plugin.device;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.plugin.context.*;
import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.monitor.Monitor;
import org.jetlinks.core.server.session.DeviceSessionProvider;
import org.jetlinks.core.server.session.DeviceSessionProviders;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.plugin.core.PluginRegistry;
import org.jetlinks.plugin.core.ServiceRegistry;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin;
import org.jetlinks.plugin.internal.device.PluginDeviceGatewayService;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.codec.Serializers;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
import org.jetlinks.community.plugin.PluginDriverManager;
import org.jetlinks.community.plugin.context.*;
import org.jetlinks.community.plugin.monitor.PluginMonitorHelper;
import org.jetlinks.community.plugin.utils.PluginUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.io.*;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
public class PluginDeviceGatewayProvider extends CompositeProtocolSupport
implements DeviceGatewayProvider, DeviceSessionProvider {
public static final String CHANNEL_PLUGIN = PluginTransport.plugin.getId();
public static final String PROVIDER = "plugin_gateway";
private final PluginRegistry registry;
private final PluginDriverManager driverManager;
private final ServiceRegistry serviceRegistry;
private final PluginDataIdMapper idMapper;
private final EventBus eventBus;
private final PluginDeviceGatewayService gatewayService;
private final DeviceRegistry deviceRegistry;
private final Map<String, DeviceGatewayPlugin> plugins = new ConcurrentHashMap<>();
public PluginDeviceGatewayProvider(PluginRegistry registry,
PluginDriverManager driverManager,
ServiceRegistry serviceRegistry,
PluginDataIdMapper idMapper,
DeviceRegistry deviceRegistry,
PluginDeviceGatewayService gatewayService,
EventBus eventBus) {
this.registry = registry;
this.deviceRegistry = deviceRegistry;
this.driverManager = driverManager;
this.serviceRegistry = serviceRegistry;
this.idMapper = idMapper;
this.gatewayService = gatewayService;
this.eventBus = eventBus;
setId(PROVIDER);
setName("插件设备接入");
addMessageCodecSupport(new PluginMessageCodec());
//状态检查
setDeviceStateChecker(
device -> device
.getConfig(PropertyConstants.accessId)
.mapNotNull(plugins::get)
.flatMap(plugin -> {
//转换为插件侧的设备操作接口
return PluginUtils
.transformToExternalDevice(idMapper, plugin, device)
.flatMap(plugin::getDeviceState);
})
);
//监听设备注册
doOnDeviceRegister(device -> device
.getConfig(PropertyConstants.accessId)
.mapNotNull(plugins::get)
.flatMap(plugin -> {
//转换为插件侧的设备操作接口
return PluginUtils
.transformToExternalDevice(idMapper, plugin, device)
.flatMap(plugin::doOnDeviceRegister);
}));
//监听设备注销
doOnDeviceUnRegister(device -> device
.getConfig(PropertyConstants.accessId)
.mapNotNull(plugins::get)
.flatMap(plugin -> {
//转换为插件侧的设备操作接口
return PluginUtils
.transformToExternalDevice(idMapper, plugin, device)
.flatMap(plugin::doOnDeviceUnregister);
}));
//监听产品注册
doOnProductRegister(product -> product
.getConfig(PropertyConstants.accessId)
.mapNotNull(plugins::get)
.flatMap(plugin -> {
//转换为插件侧的设备操作接口
return PluginUtils
.transformToExternalProduct(idMapper, plugin, product)
.flatMap(plugin::doOnProductRegister);
}));
//session序列化
DeviceSessionProviders.register(this);
}
@Override
public String getName() {
return LocaleUtils.resolveMessage("device.gateway.provider.plugin_gateway.name", super.getName());
}
@Override
public String getDescription() {
return LocaleUtils.resolveMessage("device.gateway.provider.plugin_gateway.description", super.getDescription());
}
public DeviceGatewayPlugin getPlugin(String id) {
return plugins.get(id);
}
@Override
public Transport getTransport() {
return PluginTransport.plugin;
}
@SuppressWarnings("all")
private File createWorkdir(String id) {
File workDir = new File("./data/plugins/device-gateway/" + id);
if (!workDir.exists()) {
workDir.mkdirs();
}
return workDir;
}
@Override
public String getChannel() {
return CHANNEL_PLUGIN;
}
private Mono<DeviceGateway> createGateway(DeviceGatewayProperties properties,
PluginDriver driver) {
Monitor monitor = PluginMonitorHelper.createMonitor(eventBus,driver.getType().getId(), properties.getId());
ClusterPluginScheduler scheduler = new ClusterPluginScheduler(properties.getId(), monitor);
return driver
.createPlugin(
properties.getId(),
DefaultPluginContext.of(
registry,
new CompositeServiceRegistry(
Arrays.asList(
//在插件中获取注册中心时自动转换ID
new SingleServiceRegistry("deviceRegistry",
new ExternalDeviceRegistry(properties.getId(), idMapper, deviceRegistry)),
//命令服务
CommandServiceRegistry.instance(),
//获取其他服务时使用默认的服务注册中心
serviceRegistry
)
),
new SimplePluginEnvironment(properties),
monitor,
scheduler,
createWorkdir(properties.getId())
))
.map(plugin -> {
DeviceGatewayPlugin gatewayPlugin = plugin.unwrap(DeviceGatewayPlugin.class);
PluginDeviceGateway gateway = new PluginDeviceGateway(gatewayPlugin.getId(), gatewayPlugin);
plugins.put(gatewayPlugin.getId(), gatewayPlugin);
//停止网关时停止所有调度任务
gateway.doOnShutdown(scheduler);
//移除网关
gateway.doOnShutdown(() -> plugins.remove(gatewayPlugin.getId(), gatewayPlugin));
return gateway;
});
}
@Override
public Mono<? extends DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) {
return Mono.defer(() -> driverManager
.getDriver(properties.getChannelId())
.switchIfEmpty(Mono.error(() -> new BusinessException("error.plugin_driver_does_not_exist", properties.getChannelId())))
.flatMap(driver -> createGateway(properties, driver)));
}
@Override
public Mono<? extends DeviceGateway> reloadDeviceGateway(DeviceGateway gateway,
DeviceGatewayProperties properties) {
return gateway
.unwrap(PluginDeviceGateway.class)
.shutdown()
.then(createDeviceGateway(properties))
.flatMap(gate -> gate.startup().thenReturn(gate));
}
@Override
public Mono<PersistentSession> deserialize(byte[] sessionData, DeviceRegistry registry) {
return Mono
.fromCallable(() -> {
try (ObjectInput input = Serializers
.getDefault()
.createInput(new ByteArrayInputStream(sessionData))) {
return PluginDeviceSession.read(input, registry, plugins::get);
}
})
.flatMap(Function.identity());
}
@Override
public Mono<byte[]> serialize(PersistentSession session, DeviceRegistry registry) {
if (!session.isWrapFrom(PluginDeviceSession.class)) {
return Mono.empty();
}
PluginDeviceSession deviceSession = session.unwrap(PluginDeviceSession.class);
return Mono.fromCallable(() -> {
ByteArrayOutputStream out = new ByteArrayOutputStream(70);
try (ObjectOutput output = Serializers.getDefault().createOutput(out)) {
deviceSession.write(output);
}
return out.toByteArray();
});
}
class PluginMessageCodec implements DeviceMessageCodec {
@Override
public Transport getSupportTransport() {
return PluginTransport.plugin;
}
@Nonnull
@Override
public Publisher<? extends Message> decode(@Nonnull MessageDecodeContext context) {
//never happen
return Mono.empty();
}
@Nonnull
@Override
public Publisher<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
DeviceOperator device = context.getDevice();
Message message = context.getMessage();
if (null == device || !(message instanceof DeviceMessage)) {
return Mono.empty();
}
return context
.reply(
device
// 设备接入网关ID就是插件ID
.getConfig(PropertyConstants.accessId)
.mapNotNull(plugins::get)
.switchIfEmpty(Mono.error(() -> new DeviceOperationException
.NoStackTrace(ErrorCode.SERVER_NOT_AVAILABLE, "error.plugin_not_found")))
.flatMapMany(plugin -> PluginUtils
.transformToExternalMessage(idMapper, plugin, ((DeviceMessage) message).copy())
.flatMapMany(plugin.unwrap(DeviceGatewayPlugin.class)::execute)
.flatMap(reply -> PluginUtils.transformToInternalMessage(idMapper, plugin, reply.copy())))
)
.then(Mono.empty());
}
}
}

View File

@ -0,0 +1,57 @@
package org.jetlinks.community.plugin.device;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin;
import org.jetlinks.plugin.internal.device.PluginDeviceGatewayService;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import org.jetlinks.community.plugin.utils.PluginUtils;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import reactor.core.publisher.Mono;
public class PluginDeviceGatewayServiceImpl implements PluginDeviceGatewayService {
private final DeviceGatewayHelper gatewayHelper;
private final PluginDataIdMapper dataIdMapper;
public PluginDeviceGatewayServiceImpl(DeviceRegistry registry,
DeviceSessionManager sessionManager,
DecodedClientMessageHandler handler,
PluginDataIdMapper dataIdMapper) {
this.gatewayHelper = new DeviceGatewayHelper(registry, sessionManager, handler);
this.dataIdMapper = dataIdMapper;
}
@Override
public Mono<Void> handleMessage(DeviceGatewayPlugin plugin,
DeviceMessage deviceMessage) {
return PluginUtils
.transformToInternalMessage(dataIdMapper, plugin, deviceMessage)
.flatMap(msg -> handleMessage0(plugin, msg));
}
private Mono<Void> handleMessage0(DeviceGatewayPlugin deviceGatewayPlugin,
DeviceMessage deviceMessage) {
return gatewayHelper
.handleDeviceMessage(
deviceMessage,
PluginDeviceSession::new,
session -> {
if (session.isWrapFrom(PluginDeviceSession.class)) {
session
.unwrap(PluginDeviceSession.class)
.setPlugin(deviceGatewayPlugin);
}
}, () -> {
})
.then();
}
}

View File

@ -0,0 +1,149 @@
package org.jetlinks.community.plugin.device;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin;
import reactor.core.publisher.Mono;
import javax.annotation.Nullable;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.time.Duration;
import java.util.function.Function;
public class PluginDeviceSession implements PersistentSession {
private final DeviceOperator device;
@Setter
@Getter
private DeviceGatewayPlugin plugin;
private long connectTime = System.currentTimeMillis();
private long pingTime = connectTime;
private long timeout = -1;
public PluginDeviceSession(DeviceOperator device) {
this.device = device;
}
@Override
public String getId() {
return device.getId();
}
@Override
public String getDeviceId() {
return device.getDeviceId();
}
@Nullable
@Override
public DeviceOperator getOperator() {
return device;
}
@Override
public long lastPingTime() {
return pingTime;
}
@Override
public long connectTime() {
return connectTime;
}
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return Reactors.ALWAYS_FALSE;
}
@Override
public Transport getTransport() {
return PluginTransport.plugin;
}
@Override
public void close() {
}
@Override
public void keepAlive() {
pingTime = System.currentTimeMillis();
}
@Override
public void setKeepAliveTimeout(Duration timeout) {
this.timeout = timeout.toMillis();
}
@Override
public void ping() {
pingTime = System.currentTimeMillis();
}
@Override
public boolean isAlive() {
return timeout <= 0 || System.currentTimeMillis() - pingTime < timeout;
}
@Override
public Mono<Boolean> isAliveAsync() {
//检查真实状态?
return PersistentSession.super.isAliveAsync();
}
@Override
public void onClose(Runnable call) {
}
@Override
public String getProvider() {
return PluginDeviceGatewayProvider.PROVIDER;
}
@SneakyThrows
void write(ObjectOutput out) {
out.writeUTF(getDeviceId());
out.writeLong(connectTime);
out.writeLong(pingTime);
out.writeLong(timeout);
SerializeUtils.writeNullableUTF(plugin == null ? null : plugin.getId(), out);
}
@SneakyThrows
@SuppressWarnings("all")
static Mono<PluginDeviceSession> read(ObjectInput input,
DeviceRegistry registry,
Function<String, DeviceGatewayPlugin> pluginLoader) {
String deviceId = input.readUTF();
long connectTime = input.readLong();
long pingTime = input.readLong();
long timeout = input.readLong();
String pluginId = SerializeUtils.readNullableUTF(input);
return registry
.getDevice(deviceId)
.map(device -> {
PluginDeviceSession session = new PluginDeviceSession(device);
session.connectTime = connectTime;
session.pingTime = pingTime;
session.timeout = timeout;
if (pluginId != null) {
session.plugin = pluginLoader.apply(pluginId);
}
return session;
});
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.community.plugin.device;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.jetlinks.core.message.codec.Transport;
@AllArgsConstructor
@Getter
public enum PluginTransport implements Transport {
plugin("插件");
private final String name;
@Override
public String getId() {
return name();
}
}

View File

@ -0,0 +1,45 @@
package org.jetlinks.community.plugin.impl;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.community.plugin.PluginDriverConfig;
import org.jetlinks.community.plugin.PluginDriverInstaller;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultPluginDriverInstaller implements PluginDriverInstaller {
private final Map<String, PluginDriverInstallerProvider> providers = new ConcurrentHashMap<>();
public void addProvider(PluginDriverInstallerProvider provider) {
providers.put(provider.provider(), provider);
}
@Override
public Mono<PluginDriver> install(PluginDriverConfig config) {
PluginDriverInstallerProvider provider = providers.get(config.getProvider());
if (null == provider) {
return Mono.error(() -> new UnsupportedOperationException("unsupported plugin provider:" + config.getProvider()));
}
return provider.install(config);
}
@Override
public Mono<PluginDriver> reload(PluginDriver driver, PluginDriverConfig config) {
PluginDriverInstallerProvider provider = providers.get(config.getProvider());
if (null == provider) {
return Mono.error(() -> new UnsupportedOperationException("unsupported plugin provider:" + config.getProvider()));
}
return provider.reload(driver,config);
}
@Override
public Mono<Void> uninstall(PluginDriverConfig config) {
PluginDriverInstallerProvider provider = providers.get(config.getProvider());
if (provider == null) {
return Mono.empty();
}
return provider.uninstall(config);
}
}

View File

@ -0,0 +1,204 @@
package org.jetlinks.community.plugin.impl;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.core.cache.ReactiveCacheContainer;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.community.plugin.PluginDriverInstaller;
import org.jetlinks.community.plugin.PluginDriverListener;
import org.jetlinks.community.plugin.PluginDriverManager;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* 使用{@link PluginDriverEntity}来管理插件驱动在对{@link PluginDriverEntity}CRUD时,会自动进行安装和卸载操作
*
* @author zhouhao
* @since 2.0
*/
@Slf4j
public class DefaultPluginDriverManager implements PluginDriverManager, CommandLineRunner {
private final ReactiveCacheContainer<String, PluginDriver> cache = ReactiveCacheContainer.create();
private final PluginDriverInstaller installer;
private final ReactiveRepository<PluginDriverEntity, String> repository;
private final List<PluginDriverListener> listeners = new CopyOnWriteArrayList<>();
public DefaultPluginDriverManager(ReactiveRepository<PluginDriverEntity, String> repository,
PluginDriverInstaller installer) {
this.installer = installer;
this.repository = repository;
}
@Override
public Flux<PluginDriver> getDrivers() {
return cache.values();
}
@Override
public Mono<PluginDriver> getDriver(String id) {
PluginDriver cached = cache.getNow(id);
if (null != cached) {
return Mono.just(cached);
}
AtomicBoolean installed = new AtomicBoolean();
return cache
.computeIfAbsent(id, _id -> repository
.findById(_id)
.flatMap(e -> {
installed.set(true);
return installer.install(e.toConfig());
}))
//加载完成后再执行监听器
.flatMap(driver -> {
if (installed.get()) {
return this
.fireListener(listener -> listener.onInstall(id, driver))
.thenReturn(driver);
}
return Mono.just(driver);
});
}
@Override
public Disposable listen(PluginDriverListener listener) {
listeners.add(listener);
return () -> listeners.remove(listener);
}
private Mono<Void> fireListener(Function<PluginDriverListener, Mono<Void>> call) {
if (listeners.isEmpty()) {
return Mono.empty();
}
return Flux
.fromIterable(listeners)
.flatMap(listener -> call
.apply(listener)
.onErrorResume(err -> {
log.warn("fire driver listener[{}] error", listener, err);
return Mono.empty();
}))
.then();
}
private Mono<PluginDriver> loadDriver(PluginDriverEntity entity){
return loadDriver(entity,true);
}
private Mono<PluginDriver> loadDriver(PluginDriverEntity entity,boolean reload) {
PluginDriver[] _old = new PluginDriver[1];
return cache
.compute(entity.getId(), (key, old) -> {
_old[0] = old;
if (old == null) {
return installer
.install(entity.toConfig())
.map(PluginDriverWrapper::new);
}
if(reload){
return installer
.reload(old.unwrap(PluginDriver.class), entity.toConfig())
.map(PluginDriverWrapper::new);
}
return Mono.just(old);
})
//加载完成后再触发监听器,否则如果在监听器中获取驱动可能会导致"死锁"
.flatMap(driver -> {
if (reload && _old[0] != null) {
return this
.fireListener(listener -> listener.onReload(entity.getId(), _old[0], driver))
.thenReturn(driver);
}
if (_old[0] == null) {
return this
.fireListener(listener -> listener.onInstall(entity.getId(), driver))
.thenReturn(driver);
}
return Mono.just(driver);
})
.onErrorMap(err -> new BusinessException.NoStackTrace("error.unable_to_load_plugin_driver", err));
}
@EventListener
public void handleEvent(EntityCreatedEvent<PluginDriverEntity> event) {
event.async(
Flux.fromIterable(event.getEntity())
.concatMap(this::loadDriver)
);
}
@EventListener
public void handleEvent(EntitySavedEvent<PluginDriverEntity> event) {
event.async(
Flux.fromIterable(event.getEntity())
.concatMap(this::loadDriver)
);
}
@EventListener
public void handleEvent(EntityModifyEvent<PluginDriverEntity> event) {
event.async(
Flux.fromIterable(event.getAfter())
.concatMap(this::loadDriver)
);
}
@EventListener
public void handleEvent(EntityDeletedEvent<PluginDriverEntity> event) {
event.async(
Flux.fromIterable(event.getEntity())
.concatMap(this::doHandleDeleted)
);
}
public Mono<Void> doHandleDeleted(PluginDriverEntity entity) {
PluginDriver driver = cache.remove(entity.getId());
if (null != driver) {
return this
.fireListener(listener -> listener.onUninstall(entity.getId(), driver))
.then(installer.uninstall(entity.toConfig()))
.onErrorResume(err -> {
log.warn("uninstall driver [{}] error", entity.getId(), err);
return Mono.empty();
});
}
return installer
.uninstall(entity.toConfig())
.onErrorResume(err -> {
log.warn("uninstall driver [{}] error", entity.getId(), err);
return Mono.empty();
});
}
@Override
public void run(String... args) throws Exception {
repository
.createQuery()
.fetch()
.flatMap(e -> this
.loadDriver(e, false)
.onErrorResume(err -> {
log.error("loader plugin driver [{}] error", e.getId(), err);
return Mono.empty();
}))
.then(fireListener(PluginDriverListener::onStartup))
.subscribe();
}
}

View File

@ -0,0 +1,23 @@
package org.jetlinks.community.plugin.impl;
import org.jetlinks.plugin.core.Plugin;
import org.jetlinks.plugin.core.PluginRegistry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class NonePluginRegistry implements PluginRegistry {
@Override
public Mono<Plugin> getPlugin(String type, String pluginId) {
return Mono.empty();
}
@Override
public Flux<Plugin> getPlugins(String type) {
return Flux.empty();
}
@Override
public Flux<Plugin> getPlugins() {
return Flux.empty();
}
}

View File

@ -0,0 +1,124 @@
package org.jetlinks.community.plugin.impl;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
import org.hswebframework.web.api.crud.entity.RecordModifierEntity;
import org.hswebframework.web.crud.annotation.EnableEntityEvent;
import org.hswebframework.web.crud.generator.Generators;
import org.hswebframework.web.validator.CreateGroup;
import org.jetlinks.plugin.core.PluginType;
import org.jetlinks.community.plugin.PluginDriverConfig;
import javax.persistence.Column;
import javax.persistence.GeneratedValue;
import javax.persistence.Table;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import java.sql.JDBCType;
import java.util.Collections;
import java.util.Map;
@Table(name = "s_plugin_driver")
@Getter
@Setter
@EnableEntityEvent
public class PluginDriverEntity extends GenericEntity<String> implements RecordCreationEntity, RecordModifierEntity {
@Override
@GeneratedValue(generator = Generators.SNOW_FLAKE)
@Pattern(regexp = "^[0-9a-zA-Z_\\-]+$", message = "ID只能由数字,字母,下划线和中划线组成", groups = CreateGroup.class)
@Schema(description = "ID(只能由数字,字母,下划线和中划线组成)")
public String getId() {
return super.getId();
}
@Column(length = 64, nullable = false)
@NotBlank(groups = CreateGroup.class)
@Schema(description = "名称")
private String name;
@Column
@Schema(description = "说明")
private String description;
/**
* @see PluginType#getId()
*/
@Column(length = 32, nullable = false)
@NotBlank(groups = CreateGroup.class)
private String type;
/**
* @see PluginDriverInstallerProvider#provider()
*/
@Column(length = 32, nullable = false)
@NotBlank(groups = CreateGroup.class)
private String provider;
/**
* @see PluginDriverInstallerProvider#install(PluginDriverConfig)
*/
@Column
@JsonCodec
@ColumnType(jdbcType = JDBCType.LONGVARCHAR, javaType = String.class)
private Map<String, Object> configuration;
@Column(length = 32)
@Schema(description = "插件版本")
private String version;
@Column
@Schema(description = "插件文件名")
private String filename;
@Column(updatable = false)
@Schema(
description = "创建者ID(只读)"
, accessMode = Schema.AccessMode.READ_ONLY
)
private String creatorId;
@Column(updatable = false)
@DefaultValue(generator = Generators.CURRENT_TIME)
@Schema(
description = "创建时间(只读)"
, accessMode = Schema.AccessMode.READ_ONLY
)
private Long createTime;
@Column(name = "creator_name", updatable = false)
@Schema(
description = "创建者名称(只读)"
, accessMode = Schema.AccessMode.READ_ONLY
)
private String creatorName;
@Column(length = 64)
@Schema(description = "修改人名称")
private String modifierName;
@Column(length = 64)
@Schema(description = "修改人")
private String modifierId;
@Column
@Schema(description = "修改时间")
@DefaultValue(generator = Generators.CURRENT_TIME)
private Long modifyTime;
public PluginDriverConfig toConfig() {
PluginDriverConfig config = new PluginDriverConfig();
config.setId(getId());
config.setProvider(provider);
config.setConfiguration(configuration==null? Collections.emptyMap():configuration);
return config;
}
}

View File

@ -0,0 +1,67 @@
package org.jetlinks.community.plugin.impl;
import org.hswebframework.web.crud.events.EntityBeforeDeleteEvent;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.plugin.internal.InternalPluginType;
import org.jetlinks.community.gateway.DeviceGatewayManager;
import org.jetlinks.community.plugin.PluginDriverListener;
import org.jetlinks.community.plugin.PluginDriverManager;
import org.jetlinks.community.reference.DataReferenceManager;
import org.springframework.context.event.EventListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 插件事件.
* 处理设备接入网关相关的联动
*
* @author zhangji 2023/4/19
*/
public class PluginDriverHandler implements PluginDriverListener {
private final DeviceGatewayManager deviceGatewayManager;
private final DataReferenceManager referenceManager;
public PluginDriverHandler(DeviceGatewayManager deviceGatewayManager,
DataReferenceManager referenceManager,
PluginDriverManager driverManager) {
this.deviceGatewayManager = deviceGatewayManager;
this.referenceManager = referenceManager;
driverManager.listen(this);
}
@Override
public Mono<Void> onInstall(String driverId, PluginDriver driver) {
if(InternalPluginType.deviceGateway.eq(driver.getType().getId())){
return reloadGateway(driverId);
}
return PluginDriverListener.super.onInstall(driverId, driver);
}
@Override
public Mono<Void> onReload(String driverId, PluginDriver oldDriver, PluginDriver driver) {
if(InternalPluginType.deviceGateway.eq(driver.getType().getId())){
return reloadGateway(driverId);
}
return PluginDriverListener.super.onReload(driverId, oldDriver, driver);
}
@EventListener
public void handleDeleteBefore(EntityBeforeDeleteEvent<PluginDriverEntity> event) {
event.async(
Flux.fromIterable(event.getEntity())
.flatMap(entity -> referenceManager
.assertNotReferenced(
DataReferenceManager.TYPE_PLUGIN, entity.getId(), "error.plugin_driver_referenced"
))
);
}
private Mono<Void> reloadGateway(String pluginId) {
return referenceManager
.getReferences(DataReferenceManager.TYPE_PLUGIN, pluginId)
.flatMap(referenceInfo -> deviceGatewayManager.reloadLocal(referenceInfo.getReferenceId()))
.then();
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.community.plugin.impl;
import org.jetlinks.community.plugin.PluginDriverInstaller;
/**
* 插件启动安装器提供商,用于支持不同的插件类型,: jar等
*
* @author zhouhao
* @since 2.0
*/
public interface PluginDriverInstallerProvider extends PluginDriverInstaller {
/**
* 提供商标识
* @return 标识
*/
String provider();
}

View File

@ -0,0 +1,14 @@
package org.jetlinks.community.plugin.impl;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.springframework.stereotype.Service;
/**
* 插件驱动管理.
*
* @author zhangji 2023/3/6
*/
@Service
public class PluginDriverService extends GenericReactiveCrudService<PluginDriverEntity, String> {
}

View File

@ -0,0 +1,73 @@
package org.jetlinks.community.plugin.impl;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.exception.I18nSupportException;
import org.jetlinks.core.command.Command;
import org.jetlinks.core.command.ProxyCommandSupportAdapter;
import org.jetlinks.plugin.core.*;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Objects;
class PluginDriverWrapper extends ProxyCommandSupportAdapter implements PluginDriver {
private final PluginDriver target;
public PluginDriverWrapper(PluginDriver target) {
super(target);
this.target = target;
}
@Nonnull
@Override
public Description getDescription() {
return target.getDescription();
}
@Nonnull
@Override
public PluginType getType() {
return target.getType();
}
@Nonnull
@Override
public Mono<? extends Plugin> createPlugin(@Nonnull String pluginId, @Nonnull PluginContext context) {
try {
return target
.createPlugin(pluginId, context)
.onErrorResume(
err -> !(err instanceof BusinessException),
error -> Mono.error(new I18nSupportException.NoStackTrace(
"error.create_plugin_error",
error,
Objects.toString(error.getLocalizedMessage(), error.getClass().getSimpleName()))));
} catch (Throwable error) {
return Mono.error(new I18nSupportException.NoStackTrace(
"error.create_plugin_error",
error,
Objects.toString(error.getLocalizedMessage(), error.getClass().getSimpleName())));
}
}
@Override
public boolean isWrapperFor(Class<?> type) {
return target.isWrapperFor(type);
}
@Override
public <T> T unwrap(Class<T> type) {
return target.unwrap(type);
}
@Override
public Flux<DataBuffer> getResource(String name) {
return target.getResource(name);
}
}

View File

@ -0,0 +1,137 @@
package org.jetlinks.community.plugin.impl;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.community.plugin.PluginDriverListener;
import org.springframework.context.MessageSource;
import org.springframework.context.MessageSourceResolvable;
import org.springframework.context.NoSuchMessageException;
import org.springframework.context.support.ResourceBundleMessageSource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 插件采集器-国际化管理.
*
* @author zhangji 2024/10/26
* @since 2.3
*/
public class PluginI18nMessageSource implements MessageSource, PluginDriverListener {
public static final String PATTERN = "classpath:i18n/**";
private final Map<String, MessageSource> messageSources = new ConcurrentHashMap<>();
public void addMessageSources(Map<String, MessageSource> source) {
messageSources.putAll(source);
}
public void addMessageSource(String provider, MessageSource source) {
messageSources.put(provider, source);
}
public void removeMessageSource(String provider) {
messageSources.remove(provider);
}
@Override
public String getMessage(@Nonnull String code, Object[] args, String defaultMessage, @Nonnull Locale locale) {
for (MessageSource messageSource : messageSources.values()) {
String result = messageSource.getMessage(code, args, null, locale);
if (StringUtils.hasText(result)) {
return result;
}
}
return defaultMessage;
}
@Override
@Nonnull
public String getMessage(@Nonnull String code, Object[] args, @Nonnull Locale locale) throws NoSuchMessageException {
for (MessageSource messageSource : messageSources.values()) {
try {
String result = messageSource.getMessage(code, args, locale);
if (StringUtils.hasText(result)) {
return result;
}
} catch (NoSuchMessageException ignore) {
}
}
throw new NoSuchMessageException(code, locale);
}
@Override
@Nonnull
public String getMessage(@Nonnull MessageSourceResolvable resolvable, @Nonnull Locale locale) throws NoSuchMessageException {
for (MessageSource messageSource : messageSources.values()) {
try {
String result = messageSource.getMessage(resolvable, locale);
if (StringUtils.hasText(result)) {
return result;
}
} catch (NoSuchMessageException ignore) {
}
}
String[] codes = resolvable.getCodes();
throw new NoSuchMessageException(!ObjectUtils.isEmpty(codes) ? codes[codes.length - 1] : "", locale);
}
@Override
public Mono<Void> onInstall(String driverId, PluginDriver driver) {
ClassLoader classLoader = driver.unwrap(PluginDriver.class).getClass().getClassLoader();
try {
addMessageSource(
driverId,
getMessageSource(classLoader)
);
} catch (IOException ignore) {
}
return Mono.empty();
}
public MessageSource getMessageSource(ClassLoader classLoader) throws IOException {
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(classLoader);
Resource[] resources = resolver.getResources(PATTERN);
ResourceBundleMessageSource messageSource = new ResourceBundleMessageSource();
messageSource.setDefaultEncoding("UTF-8");
messageSource.setBundleClassLoader(classLoader);
for (Resource resource : resources) {
String path = resource.getURL().getPath();
if (StringUtils.hasText(path) && (path.endsWith(".properties") || path.endsWith(".xml"))) {
path = path.substring(path.lastIndexOf("i18n"));
String[] split = path.split("[/|\\\\]");
String name = split[split.length - 1];
name = name.contains("_") ? name.substring(0, name.indexOf("_")) : name;
split[split.length - 1] = name;
messageSource.addBasenames(String.join("/", split));
}
}
return messageSource;
}
@Override
public Mono<Void> onReload(String driverId, PluginDriver oldDriver, PluginDriver driver) {
removeMessageSource(driverId);
return onInstall(driverId, driver);
}
@Override
public Mono<Void> onUninstall(String driverId, PluginDriver driver) {
removeMessageSource(driverId);
return Mono.empty();
}
}

View File

@ -0,0 +1,135 @@
package org.jetlinks.community.plugin.impl.id;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.core.Value;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import org.jetlinks.plugin.internal.PluginDataMapping;
import org.springframework.context.event.EventListener;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import java.util.HashMap;
import java.util.function.Function;
@AllArgsConstructor
public class DefaultPluginDataIdMapper implements PluginDataIdMapper {
private final ReactiveRepository<PluginDataIdMappingEntity, String> repository;
private final ConfigStorageManager storageManager;
@EventListener
public void handleEvent(EntityCreatedEvent<PluginDataIdMappingEntity> event){
event.async(
saveMapping(Flux.fromIterable(event.getEntity()))
);
}
@EventListener
public void handleEvent(EntityModifyEvent<PluginDataIdMappingEntity> event){
event.async(
saveMapping(Flux.fromIterable(event.getAfter()))
);
}
@EventListener
public void handleEvent(EntitySavedEvent<PluginDataIdMappingEntity> event){
event.async(
saveMapping(Flux.fromIterable(event.getEntity()))
);
}
@EventListener
public void handleEvent(EntityDeletedEvent<PluginDataIdMappingEntity> event){
event.async(
removeMapping(Flux.fromIterable(event.getEntity()))
);
}
protected <T> Mono<T> doWithStore(Function<ConfigStorage, Mono<T>> mapper) {
return storageManager
.getStorage("plugin-id-mapping")
.flatMap(mapper);
}
private Mono<Void> saveMapping(Flux<PluginDataIdMappingEntity> entityFlux) {
return this
.doWithStore(store -> entityFlux
.flatMap(e -> Flux.just(
Tuples.of(
createMappingKey(e.getType(), e.getPluginId(), e.getInternalId()),
e.getExternalId()
),
Tuples.of(
createMappingKey(e.getType(), e.getPluginId(), e.getExternalId()),
e.getInternalId()
)
))
.reduce(new HashMap<String, Object>(), (map, tp2) -> {
map.put(tp2.getT1(), tp2.getT2());
return map;
})
.flatMap(store::setConfigs))
.then();
}
public Mono<Void> removeMapping(Flux<PluginDataIdMappingEntity> entityFlux) {
return this
.doWithStore(store -> entityFlux
.flatMap(e -> Flux.just(
createMappingKey(e.getType(), e.getPluginId(), e.getInternalId()),
createMappingKey(e.getType(), e.getPluginId(), e.getExternalId())
))
.buffer(200)
.flatMap(store::remove)
.then());
}
private String createMappingKey(String type, String pluginId, String id) {
return DigestUtils.md5Hex(String.join("|", type, pluginId, id));
}
@Override
public Mono<String> getInternalId(String type,
String pluginId,
String externalId) {
Assert.notNull(externalId,"externalId must not be null");
return doWithStore(store -> store
.getConfig(createMappingKey(type, pluginId, externalId))
.map(Value::asString))
.defaultIfEmpty(externalId);
}
@Override
public Mono<String> getExternalId(String type,
String pluginId,
String internalId) {
Assert.notNull(internalId,"internalId must not be null");
return doWithStore(store -> store
.getConfig(createMappingKey(type, pluginId, internalId))
.map(Value::asString))
.defaultIfEmpty(internalId);
}
@Override
public Flux<PluginDataMapping> getMappings(String type, String pluginId) {
return repository
.createQuery()
.where(PluginDataIdMappingEntity::getType,type)
.and(PluginDataIdMappingEntity::getPluginId,pluginId)
.fetch()
.map(entity-> new PluginDataMapping(entity.getExternalId(),entity.getInternalId()));
}
}

View File

@ -0,0 +1,64 @@
package org.jetlinks.community.plugin.impl.id;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.crud.annotation.EnableEntityEvent;
import org.hswebframework.web.utils.DigestUtils;
import javax.persistence.Column;
import javax.persistence.Index;
import javax.persistence.Table;
@Getter
@Setter
@Table(name = "s_plugin_id_mapping", indexes = {
@Index(name = "idx_plg_im_internal", columnList = "type,pluginId,internalId"),
@Index(name = "idx_plg_im_external", columnList = "type,pluginId,externalId"),
})
@Schema(description = "插件数据ID映射表")
@EnableEntityEvent
public class PluginDataIdMappingEntity extends GenericEntity<String> {
@Column(length = 32, updatable = false, nullable = false)
@Schema(description = "数据类型")
private String type;
@Column(length = 32, nullable = false, updatable = false)
@Schema(description = "插件ID")
private String pluginId;
@Column(length = 64, updatable = false, nullable = false)
@Schema(description = "内部数据ID")
private String internalId;
@Column(length = 64, updatable = false, nullable = false)
@Schema(description = "插件数据ID")
private String externalId;
@Override
public String getId() {
if (StringUtils.isEmpty(super.getId())) {
setId(
DigestUtils.md5Hex(
String.join("|", type, pluginId, internalId, externalId)
)
);
}
return super.getId();
}
public static PluginDataIdMappingEntity of(String pluginId,
String internalId,
String type,
String externalId) {
PluginDataIdMappingEntity entity = new PluginDataIdMappingEntity();
entity.setPluginId(pluginId);
entity.setType(type);
entity.setInternalId(internalId);
entity.setExternalId(externalId);
return entity;
}
}

View File

@ -0,0 +1,227 @@
package org.jetlinks.community.plugin.impl.jar;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.exception.I18nSupportException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.utils.ClassUtils;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.community.io.file.FileInfo;
import org.jetlinks.community.io.utils.FileUtils;
import org.jetlinks.community.plugin.PluginDriverConfig;
import org.jetlinks.community.plugin.impl.PluginDriverInstallerProvider;
import org.jetlinks.community.utils.ReactorUtils;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.supports.protocol.validator.MethodDeniedClassVisitor;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static java.nio.file.StandardOpenOption.*;
/**
* 基于jar包的方式加载驱动.
* <p>
* {@link PluginDriverConfig#getConfiguration()}配置中设置<code>location</code>来指定jar包的文件访问地址(http).
* <p>
* 首次加载时,会将文件下载到本地临时目录,文件名格式为<code>{id}_{md5(location)}.jar</code>.
* 如果文件地址变更或者文件不存在,会重新进行下载.
* <p>
* <h3>加载逻辑</h3>
* 优先使用{@link ServiceLoader}来加载驱动: 通过配置<code>META-INF/services/org.jetlinks.plugin.core.PluginDriver</code>
* 指定驱动实现类.
* <p>
* 如果未配置,则将扫描整个jar包中的{@link PluginDriver}的实现类并初始化.
* </p>
*
* @author zhouhao
* @since 2.0
*/
@Slf4j
public class JarPluginDriverInstallerProvider implements PluginDriverInstallerProvider {
public static final String PROVIDER = "jar";
private static final String TEMP_ID = "temp";
public static final String LOCATION_KEY = "location";
@Setter
private Duration loadTimeout = TimeUtils.parse(System.getProperty("jetlinks.plugin.load.timeout", "1m"));
//保存驱动文件的临时目录
private final File tempPath = new File("./data/plugin-drivers");
//已经加载的驱动ClassLoader
private final Map<String, PluginClassLoader> pluginLoaders = new ConcurrentHashMap<>();
private final MethodDeniedClassVisitor classVisitor;
private final WebClient webClient;
public JarPluginDriverInstallerProvider(WebClient.Builder builder) {
this.classVisitor = MethodDeniedClassVisitor.global();
this.webClient = builder.build();
tempPath.mkdirs();
}
@Override
public String provider() {
return PROVIDER;
}
private Mono<File> loadDriverFilePath(PluginDriverConfig config) {
String location = Optional
.ofNullable(config.getConfiguration().get(LOCATION_KEY))
.map(String::valueOf)
.orElseThrow(() -> new IllegalArgumentException("location can not be null"));
//远程文件则先下载再加载
if (StringUtils.hasText(location) && location.startsWith("http")) {
String urlMd5 = DigestUtils.md5Hex(location);
//地址没变则直接加载本地文件
File file = new File(tempPath, config.getId() + "_" + urlMd5 + ".jar");
if (file.exists()) {
// TODO: 2023/2/8 校验MD5
return Mono.just(file);
}
return FileUtils
.readDataBuffer(webClient,location)
.as(dataStream -> {
if (log.isDebugEnabled()) {
log.debug("download protocol file {} to {}", location, file);
}
//写出文件
return DataBufferUtils
.write(dataStream, file.toPath(), CREATE, WRITE,TRUNCATE_EXISTING)
.thenReturn(file);
})
//使用弹性线程池来写出文件
.subscribeOn(Schedulers.boundedElastic())
.timeout(loadTimeout,
Mono.error(() -> new I18nSupportException
.NoStackTrace("error.load_plugin_file_timeout", location)))
//失败时删除文件
.doOnError(err -> file.delete())
.thenReturn(file);
}
return Mono.empty();
}
private Mono<PluginDriver> loadFromFile(PluginDriverConfig config,
File file) {
return Mono.defer(() -> {
PluginClassLoader classLoader = pluginLoaders.compute(config.getId(), (id, old) -> {
if (old != null) {
closeClassLoader(old);
}
return createClassLoader(file);
});
PluginDriver driver = lookupDriver(classLoader);
if (driver == null) {
return Mono
.error(() -> new BusinessException
.NoStackTrace("error.please_upload_the_correct_plugin_package", 400)
);
}
return Mono.just(driver);
});
}
protected void closeClassLoader(PluginClassLoader loader) {
if (null == loader) {
return;
}
try {
loader.close();
} catch (IOException ignore) {
}
}
@Override
public Mono<PluginDriver> reload(PluginDriver driver, PluginDriverConfig config) {
ReactorUtils.dispose(driver);
return install(config);
}
protected PluginDriver lookupDriver(PluginClassLoader classLoader) {
//扫描
PluginDriver scan = ClassUtils
.findImplClass(PluginDriver.class,
"classpath:**/*.class",
true,
classLoader,
(loader, name, clazz) -> classVisitor.validate(name, clazz),
(loader, name, clazz) -> loader.loadSelfClass(name))
.orElse(null);
try {
Iterator<PluginDriver> driverIterator = ServiceLoader.load(PluginDriver.class, classLoader).iterator();
if (driverIterator.hasNext()) {
return driverIterator.next();
}
} catch (Throwable ignore) {
}
return scan;
}
@SneakyThrows
protected PluginClassLoader createClassLoader(File location) {
return new PluginClassLoader(new URL[]{location.toURI().toURL()}, this.getClass().getClassLoader());
}
@Override
public Mono<PluginDriver> install(PluginDriverConfig config) {
return loadDriverFilePath(config)
.flatMap(file -> loadFromFile(config, file))
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<Void> uninstall(PluginDriverConfig config) {
return Mono
.<Void>fromCallable(() -> {
//关闭classloader
closeClassLoader(pluginLoaders.remove(config.getId()));
//删除驱动文件
Files.walk(tempPath.toPath())
.forEach(path -> {
File file = path.toFile();
if (file.getName().startsWith(config.getId() + "_")) {
file.delete();
}
});
return null;
})
.subscribeOn(Schedulers.boundedElastic());
}
/**
* 创建临时的插件配置
*
* @param fileInfo 文件信息
* @return 插件配置
*/
public static PluginDriverConfig tempConfig(FileInfo fileInfo) {
PluginDriverConfig config = new PluginDriverConfig();
config.setId(IDGenerator.RANDOM.generate() + "-" + TEMP_ID);
config.setProvider(PROVIDER);
config.setConfiguration(Collections.singletonMap(LOCATION_KEY, fileInfo.getAccessUrl()));
return config;
}
}

View File

@ -0,0 +1,61 @@
package org.jetlinks.community.plugin.impl.jar;
import lombok.Getter;
import lombok.SneakyThrows;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
public class PluginClassLoader extends URLClassLoader {
@Getter
private final URL[] urls;
public PluginClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
this.urls = urls;
}
@Override
public void close() throws IOException {
super.close();
}
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
try {
Class<?> clazz = loadSelfClass(name);
if (null != clazz) {
if (resolve) {
resolveClass(clazz);
}
return clazz;
}
} catch (Throwable ignore) {
}
return super.loadClass(name, resolve);
}
@SneakyThrows
public Class<?> loadSelfClass(String name) {
Class<?> clazz = super.findLoadedClass(name);
if (clazz != null) {
return clazz;
}
clazz = super.findClass(name);
resolveClass(clazz);
return clazz;
}
@Override
public URL getResource(String name) {
if (StringUtils.isEmpty(name)) {
return urls[0];
}
return super.findResource(name);
}
}

View File

@ -0,0 +1,60 @@
package org.jetlinks.community.plugin.monitor;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.lang.SeparatedCharSequence;
import org.jetlinks.core.lang.SharedPathString;
import org.jetlinks.core.monitor.Monitor;
import org.jetlinks.community.monitor.AbstractEventMonitor;
import org.slf4j.LoggerFactory;
@AllArgsConstructor
public class PluginMonitorHelper {
private static final SharedPathString ALL_PLUGIN_LOGGER =
SharedPathString.of("/_monitor/plugin/*/*/logger");
static final SharedPathString tracePrefix = SharedPathString.of("/plugin/*/*");
static SeparatedCharSequence createLoggerTopicPrefix(String type, String pluginId) {
return ALL_PLUGIN_LOGGER.replace(3, type, 4, pluginId);
}
public static SeparatedCharSequence createLoggerTopic(String type, String pluginId, String level) {
return createLoggerTopicPrefix(type, pluginId).append(level);
}
/**
* 创建插件监控
* <p>
* 可通过{@link PluginMonitorHelper#createLoggerTopic(String, String, String)}来订阅日志.
*
* @param eventBus 事件总线
* @param type 插件类型
* @param pluginId 插件ID
* @return 插件监控
*/
public static Monitor createMonitor(EventBus eventBus, String type, String pluginId) {
return new PluginMonitor(eventBus, type, pluginId);
}
@Getter
private final static class PluginMonitor extends AbstractEventMonitor {
private final org.slf4j.Logger logger;
private PluginMonitor(EventBus eventBus, String type, String id) {
super(eventBus,
tracePrefix.replace(2, type, 3, id),
createLoggerTopicPrefix(type, id)
);
logger = LoggerFactory.getLogger("org.jetlinks.plugin.monitor." + type);
}
@Override
protected CharSequence getLogType() {
return this.loggerEventPrefix.range(4, this.loggerEventPrefix.size() - 1);
}
}
}

View File

@ -0,0 +1,83 @@
package org.jetlinks.community.plugin.utils;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.message.*;
import org.jetlinks.plugin.core.Plugin;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import org.jetlinks.community.plugin.device.ExternalDeviceOperator;
import org.jetlinks.community.plugin.device.ExternalDeviceProductOperator;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;
import reactor.function.Function3;
public class PluginUtils {
public static Mono<DeviceProductOperator> transformToExternalProduct(PluginDataIdMapper idMapper,
Plugin plugin,
DeviceProductOperator product) {
return idMapper
.getExternalId(PluginDataIdMapper.TYPE_PRODUCT, plugin.getId(), product.getId())
.map(ext -> new ExternalDeviceProductOperator(ext, product));
}
public static Mono<DeviceOperator> transformToExternalDevice(PluginDataIdMapper idMapper,
Plugin plugin,
DeviceOperator device) {
return idMapper
.getExternalId(PluginDataIdMapper.TYPE_DEVICE, plugin.getId(), device.getDeviceId())
.map(ext -> new ExternalDeviceOperator(ext, plugin.getId(), idMapper, device));
}
public static <T extends ThingMessage> Mono<T> transformToInternalMessage(PluginDataIdMapper idMapper,
Plugin plugin,
T message) {
return transformMessage(plugin, message, idMapper::getInternalId);
}
public static <T extends ThingMessage> Mono<T> transformToExternalMessage(PluginDataIdMapper idMapper,
Plugin plugin,
T message) {
return transformMessage(plugin, message, idMapper::getExternalId);
}
private static <T extends ThingMessage> Mono<T> transformMessage(Plugin plugin,
T message,
Function3<String, String, String, Mono<String>> mapper) {
Assert.hasText(message.getThingId(),
() -> message.getThingType() + "Id must not be empty");
DeviceMessage child = null;
if (message instanceof ChildDeviceMessage) {
Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();
if (msg instanceof DeviceMessage) {
child = ((DeviceMessage) msg);
}
} else if (message instanceof ChildDeviceMessageReply) {
Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage();
if (msg instanceof DeviceMessage) {
child = ((DeviceMessage) msg);
}
}
if (child != null) {
return transform0(plugin, child, mapper)
.then(transform0(plugin, message, mapper));
}
return transform0(plugin, message, mapper);
}
private static <T extends ThingMessage> Mono<T> transform0(Plugin plugin,
T message,
Function3<String, String, String, Mono<String>> mapper) {
return mapper
.apply(message.getThingType(), plugin.getId(), message.getThingId())
.map(internalId -> {
message.thingId(message.getThingType(), internalId);
return message;
});
}
}

View File

@ -0,0 +1,117 @@
package org.jetlinks.community.plugin.web;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.authorization.annotation.QueryAction;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.annotation.SaveAction;
import org.jetlinks.community.plugin.impl.id.PluginDataIdMappingEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
/**
* 插件数据ID映射.
*
* @author zhangji 2023/3/2
*/
@AllArgsConstructor
@RestController
@RequestMapping("/plugin/mapping")
@Resource(id = "plugin-driver", name = "插件驱动管理")
@Tag(name = "插件数据ID映射")
public class PluginDataIdMappingController {
private final ReactiveRepository<PluginDataIdMappingEntity, String> repository;
@PatchMapping("/{type}/{pluginId:.+}/{internalId:.+}")
@SaveAction
@Operation(summary = "保存数据ID映射")
public Mono<SaveResult> save(@PathVariable @Parameter(description = "插件数据类型") String type,
@PathVariable @Parameter(description = "插件ID") String pluginId,
@PathVariable @Parameter(description = "内部数据ID") String internalId,
@RequestBody Mono<String> externalId) {
return externalId
.map(id -> PluginDataIdMappingEntity.of(pluginId, internalId, type, id))
.flatMap(entity -> repository
.createDelete()
.where(PluginDataIdMappingEntity::getPluginId, pluginId)
.and(PluginDataIdMappingEntity::getInternalId, internalId)
.and(PluginDataIdMappingEntity::getType, type)
.execute()
.thenReturn(entity))
.flatMap(repository::save);
}
@GetMapping("/{type}/{pluginId:.+}/{internalId:.+}")
@QueryAction
@Operation(summary = "获取数据ID映射")
public Mono<PluginDataIdMappingEntity> queryOne(@PathVariable @Parameter(description = "插件数据类型") String type,
@PathVariable @Parameter(description = "插件ID") String pluginId,
@PathVariable @Parameter(description = "内部数据ID") String internalId) {
return repository
.createQuery()
.where(PluginDataIdMappingEntity::getPluginId, pluginId)
.and(PluginDataIdMappingEntity::getInternalId, internalId)
.and(PluginDataIdMappingEntity::getType, type)
.fetchOne();
}
@PostMapping("/{type}/_all")
@QueryAction
@Operation(summary = "获取指定类型的所有ID映射")
@Deprecated
public Flux<PluginDataIdMappingEntity> queryAll(@PathVariable @Parameter(description = "插件数据类型") String type,
@RequestBody(required = false)
@Parameter(description = "指定查询的插件数据ID") Mono<List<String>> includes) {
return includes
.flatMapMany(externalIds -> repository
.createQuery()
.where(PluginDataIdMappingEntity::getType, type)
.when(!CollectionUtils.isEmpty(externalIds),
query -> query.in(PluginDataIdMappingEntity::getExternalId, externalIds))
.fetch());
}
@PostMapping("/{type}/{pluginId:.+}/_all")
@QueryAction
@Operation(summary = "获取指定类型的所有ID映射")
public Flux<PluginDataIdMappingEntity> queryAll(@PathVariable @Parameter(description = "插件数据类型") String type,
@PathVariable @Parameter(description = "插件ID") String pluginId,
@RequestBody(required = false)
@Parameter(description = "指定查询的插件数据ID") Mono<List<String>> includes) {
return includes
.flatMapMany(externalIds -> repository
.createQuery()
.where(PluginDataIdMappingEntity::getType, type)
.and(PluginDataIdMappingEntity::getPluginId, pluginId)
.when(!CollectionUtils.isEmpty(externalIds),
query -> query.in(PluginDataIdMappingEntity::getExternalId, externalIds))
.fetch());
}
@PostMapping("/_query")
@QueryAction
@Operation(summary = "动态查询ID映射")
public Flux<PluginDataIdMappingEntity> query(@RequestBody(required = false) Mono<QueryParamEntity> queryParam) {
return queryParam
.flatMapMany(query -> repository
.createQuery()
.setParam(query)
.fetch());
}
}

View File

@ -0,0 +1,286 @@
package org.jetlinks.community.plugin.web;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.hswebframework.web.authorization.annotation.QueryAction;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.annotation.SaveAction;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.hswebframework.web.exception.ValidationException;
import org.hswebframework.web.i18n.LocaleUtils;
import org.hswebframework.web.validator.CreateGroup;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.utils.TypeScriptUtils;
import org.jetlinks.plugin.core.Description;
import org.jetlinks.plugin.core.PluginDriver;
import org.jetlinks.plugin.core.PluginType;
import org.jetlinks.plugin.internal.device.DeviceGatewayPluginDriver;
import org.jetlinks.plugin.internal.device.DeviceProduct;
import org.jetlinks.community.io.file.FileInfo;
import org.jetlinks.community.io.file.FileManager;
import org.jetlinks.community.io.file.FileOption;
import org.jetlinks.community.io.utils.FileUtils;
import org.jetlinks.community.plugin.PluginDriverConfig;
import org.jetlinks.community.plugin.PluginDriverInstaller;
import org.jetlinks.community.plugin.PluginDriverManager;
import org.jetlinks.community.plugin.impl.PluginDriverEntity;
import org.jetlinks.community.plugin.impl.PluginDriverService;
import org.jetlinks.community.plugin.impl.jar.JarPluginDriverInstallerProvider;
import org.jetlinks.community.web.response.ValidationResult;
import org.springframework.http.HttpStatus;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.function.Function;
@AllArgsConstructor
@RestController
@RequestMapping("/plugin/driver")
@Resource(id = "plugin-driver", name = "插件驱动管理")
@Tag(name = "插件驱动管理")
public class PluginDriverController implements ReactiveServiceCrudController<PluginDriverEntity, String> {
private final PluginDriverService service;
private final PluginDriverManager driverManager;
private final FileManager fileManager;
private final PluginDriverInstaller installer;
private final Set<String> allowResourceExt = new HashSet<>(
Arrays.asList(
"js", "ts", "vue", "json",
"md", "html", "css",
"ttf", "woff", "woff2", "zip",
"jpg", "jpeg", "png", "gif"
)
);
@Override
public ReactiveCrudService<PluginDriverEntity, String> getService() {
return service;
}
@GetMapping("/types")
@QueryAction
public Flux<PluginTypeInfo> getPluginTypes() {
return driverManager
.getDrivers()
.map(PluginDriver::getType)
.distinct(PluginType::getId)
.map(PluginTypeInfo::of);
}
@GetMapping("/{driverId}/**")
@QueryAction
public Mono<Void> getResource(@PathVariable String driverId,
ServerWebExchange exchange) {
String _path = exchange.getRequest().getPath().value();
_path = _path.substring(_path.indexOf(driverId) + driverId.length() + 1);
String path = _path;
String ext = FileUtils.getExtension(path);
//校验是否允许获取的文件拓展名
if (!allowResourceExt.contains(ext)) {
exchange
.getResponse()
.setStatusCode(HttpStatus.NOT_FOUND);
return Mono.empty();
}
exchange.getResponse()
.getHeaders()
.setContentType(FileUtils.getMediaTypeByExtension(ext));
return driverManager
.getDriver(driverId)
.map(driver -> exchange.getResponse().writeWith(driver.getResource(path)))
.switchIfEmpty(Mono.fromRunnable(() -> exchange
.getResponse()
.setStatusCode(HttpStatus.NOT_FOUND)))
.flatMap(Function.identity());
}
@GetMapping("/{driverId}/description")
@QueryAction
@Operation(summary = "获取插件详情")
public Mono<Description> description(@PathVariable String driverId) {
return driverManager
.getDriver(driverId)
.map(PluginDriver::getDescription);
}
@GetMapping("/{driverId}/products")
@QueryAction
@Operation(summary = "获取插件支持的产品信息")
public Flux<DeviceProduct> deviceProduct(@PathVariable String driverId) {
return driverManager
.getDriver(driverId)
.filter(driver -> driver.isWrapperFor(DeviceGatewayPluginDriver.class))
.flatMapMany(driver -> driver
.isWrapperFor(DeviceGatewayPluginDriver.class)
? driver.unwrap(DeviceGatewayPluginDriver.class).getSupportProducts()
: Flux.empty());
}
@PostMapping("/upload")
@SaveAction
@Operation(summary = "上传文件,返回插件信息")
public Mono<PluginDriverUploadInfo> upload(@RequestPart("file") Mono<FilePart> partMono) {
return partMono
.flatMap(fileManager::saveFile)
.flatMap(fileInfo -> {
PluginDriverUploadInfo info = new PluginDriverUploadInfo().with(fileInfo);
PluginDriverConfig config = JarPluginDriverInstallerProvider.tempConfig(fileInfo);
return installer
.install(config)
.doOnNext(info::with)
.then(installer.uninstall(config))
.thenReturn(info);
});
}
@PostMapping("/convert")
@SaveAction
@Operation(summary = "获取插件详情信息")
public Mono<PluginDriverUploadInfo> convertToDetail(@RequestBody Mono<PluginDriverEntity> pluginDriver) {
return pluginDriver
.map(PluginDriverEntity::toConfig)
.doOnNext(config -> config.setId("_debug"))
.flatMap(def -> {
PluginDriverUploadInfo uploadInfo = new PluginDriverUploadInfo();
return installer
.install(def)
.map(uploadInfo::with)
.flatMap(info -> installer
.uninstall(def)
.thenReturn(info));
});
}
@GetMapping("/id/_validate")
@QueryAction
@Operation(summary = "验证插件ID是否合法")
public Mono<ValidationResult> idValidate(@RequestParam @Parameter(description = "插件ID") String id) {
PluginDriverEntity entity = new PluginDriverEntity();
entity.setId(id);
entity.tryValidate("id", CreateGroup.class);
return service
.findById(id)
.flatMap(ignore -> LocaleUtils.resolveMessageReactive("error.plugin_driver_id_already_exists"))
.map(ValidationResult::error)
.defaultIfEmpty(ValidationResult.success())
.onErrorResume(ValidationException.class, e -> Mono.just(e.getI18nCode())
.map(ValidationResult::error));
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public static class PluginTypeInfo {
private String id;
private String name;
public String getValue() {
return id;
}
public String getText() {
return name;
}
public static PluginTypeInfo of(PluginType type) {
return new PluginTypeInfo(type.getId(), type.getName());
}
}
@Setter
@Getter
public static class PluginDriverUploadInfo {
@Schema(description = "插件ID")
private String id;
@Schema(description = "插件名称")
private String name;
@Schema(description = "插件说明")
private String description;
@Schema(description = "插件版本")
private String version;
@Schema(description = "插件类型")
private PluginTypeInfo type;
@Schema(description = "文件访问地址")
private String accessUrl;
@Schema(description = "文件名称")
private String filename;
@Schema(description = "文件后缀")
private String extension;
@Schema(description = "文件长度")
private long length;
@Schema(description = "md5")
private String md5;
@Schema(description = "sha256")
private String sha256;
@Schema(description = "创建时间")
private long createTime;
@Schema(description = "创建者ID")
private String creatorId;
@Schema(description = "文件配置")
private FileOption[] options;
@Schema(description = "其他信息")
private Map<String, Object> others;
public PluginDriverUploadInfo with(FileInfo fileInfo) {
FastBeanCopier.copy(fileInfo, this, "id", "name");
setFilename(fileInfo.getName());
return this;
}
public PluginDriverUploadInfo with(PluginDriver pluginDriver) {
setId(pluginDriver.getDescription().getId());
setName(pluginDriver.getDescription().getName());
setDescription(pluginDriver.getDescription().getDescription());
setVersion(pluginDriver.getDescription().getVersion().toString());
setType(PluginTypeInfo.of(pluginDriver.getType()));
setOthers(pluginDriver.getDescription().getOthers());
return this;
}
}
}

View File

@ -0,0 +1,2 @@
org.jetlinks.community.plugin.configuration.PluginAutoConfiguration
org.jetlinks.community.plugin.configuration.PluginAutoConfiguration.DevicePluginAutoConfiguration

View File

@ -0,0 +1,11 @@
#error
error.unable_to_load_plugin_driver=Unable to load plugin driver
error.plugin_driver_referenced=Delete failed, plugin has been bound by device access gateway or referenced by other data
error.plugin_driver_id_already_exists=The plugin driver id already exists
error.create_plugin_error=Create Plugin Error: {0}
error.please_upload_the_correct_plugin_package=Please upload the correct plugin package
error.load_plugin_file_timeout=Load plugin file timeout
hswebframework.web.system.action.command=Execute Command
hswebframework.web.system.permission.standalone-plugin=Standalone Plugin Management
hswebframework.web.system.permission.plugin-driver=Plugin Driver Management

View File

@ -0,0 +1,10 @@
error.unable_to_load_plugin_driver=\u65E0\u6CD5\u52A0\u8F7D\u63D2\u4EF6\u9A71\u52A8
error.plugin_driver_referenced=\u5220\u9664\u5931\u8D25\uFF0C\u63D2\u4EF6\u5DF2\u88AB\u8BBE\u5907\u63A5\u5165\u7F51\u5173\u7ED1\u5B9A\u6216\u5DF2\u88AB\u5176\u4ED6\u6570\u636E\u5F15\u7528
error.plugin_driver_id_already_exists=\u63D2\u4EF6ID\u5DF2\u5B58\u5728
error.create_plugin_error=\u521B\u5EFA\u63D2\u4EF6\u5931\u8D25:{0}
error.please_upload_the_correct_plugin_package=\u8BF7\u4E0A\u4F20\u6B63\u786E\u7684\u63D2\u4EF6\u5305
error.load_plugin_file_timeout=\u52A0\u8F7D\u63D2\u4EF6\u6587\u4EF6\u8D85\u65F6
hswebframework.web.system.action.command=\u6267\u884C\u547D\u4EE4
hswebframework.web.system.permission.standalone-plugin=\u72EC\u7ACB\u63D2\u4EF6\u7BA1\u7406
hswebframework.web.system.permission.plugin-driver=\u63D2\u4EF6\u9A71\u52A8\u7BA1\u7406

View File

@ -0,0 +1 @@
<?xml version="1.0" standalone="no"?><!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"><svg t="1700809854285" class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="2253" width="60" height="60" xmlns:xlink="http://www.w3.org/1999/xlink"><path d="M810.666667 597.333333h-85.333334v-64a21.333333 21.333333 0 0 0-21.333333-21.333333h-85.333333a21.333333 21.333333 0 0 0-21.333334 21.333333V597.333333h-170.666666v-64a21.333333 21.333333 0 0 0-21.333334-21.333333h-85.333333a21.333333 21.333333 0 0 0-21.333333 21.333333V597.333333H213.333333a42.666667 42.666667 0 0 0-42.666666 42.666667v256a42.666667 42.666667 0 0 0 42.666666 42.666667h597.333334a42.666667 42.666667 0 0 0 42.666666-42.666667v-256a42.666667 42.666667 0 0 0-42.666666-42.666667z m-42.666667 256H256v-170.666666h512z m42.666667-768H213.333333a42.666667 42.666667 0 0 0-42.666666 42.666667v256a42.666667 42.666667 0 0 0 42.666666 42.666667h597.333334a42.666667 42.666667 0 0 0 42.666666-42.666667V128a42.666667 42.666667 0 0 0-42.666666-42.666667z m-42.666667 256H256V170.666667h512z" p-id="2254" fill="#ffffff"></path></svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@ -30,6 +30,7 @@
<module>tdengine-component</module>
<module>timescaledb-component</module>
<module>datasource-component</module>
<module>plugin-component</module>
</modules>
<artifactId>jetlinks-components</artifactId>

View File

@ -346,6 +346,12 @@
<artifactId>tdengine-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>plugin-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

View File

@ -24,6 +24,9 @@ public class JetLinksApplication {
public static void main(String[] args) {
SpringApplication.run(JetLinksApplication.class, args);
System.out.println("=======================启动成功==========================");
System.out.println(" 管理员用户名: admin ");
System.out.println(" 管理员初始密码: 请查看ADMIN_USER_PASSWORD配置");
}

View File

@ -325,6 +325,12 @@
<type>pom</type>
</dependency>
<dependency>
<groupId>org.jetlinks.plugin</groupId>
<artifactId>plugin-internal</artifactId>
<version>${jetlinks.plugin.version}</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>