Compare commits

...

14 Commits
2.11 ... 1.20

Author SHA1 Message Date
zhouhao 477ecd911a fix: 优化tcp client逻辑 2024-06-12 09:34:48 +08:00
zhouhao 29174496b6 fix: 修复处理keepOnline会话错误问题 2024-06-12 09:30:41 +08:00
zhouhao 9711f857d2 fix: maven repo 2024-06-11 10:37:54 +08:00
zhouhao 6c29d45d67 fix: maven repo 2024-06-11 10:37:25 +08:00
zhouhao c78e3795a9 refactor: 优化tcp设备接入网关逻辑 2024-06-11 10:33:08 +08:00
zhouhao f3a5fb2bff refactor: 优化mqtt设备接入 2024-03-07 10:20:47 +08:00
zhouhao 101aa5bfa7 refactor: 优化文件上传逻辑 2024-02-20 14:25:12 +08:00
老周 f524443ba3
Update PersistenceDeviceSessionManager.java 2023-06-30 19:56:17 +08:00
老周 44f187d5db
Update PersistenceDeviceSessionManager.java 2023-06-30 19:53:52 +08:00
zhouhao 393930e72b fix: compile error 2023-06-10 17:28:51 +08:00
zhouhao 7075c29740 Merge branch '1.20-release' into 1.20 2023-06-10 17:27:50 +08:00
zhouhao 7a0ddf7aae 升级依赖 2023-02-10 13:48:28 +08:00
zhouhao 9c7a7a5afb 优化配置 2022-07-30 11:36:08 +08:00
zhouhao b459f66925 增加api相关配置 2022-07-30 11:35:56 +08:00
32 changed files with 670 additions and 278 deletions

View File

@ -1 +1 @@
distributionUrl=https://downloads.apache.org/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip
distributionUrl=https://archive.apache.org/dist/maven/maven-3/3.9.3/binaries/apache-maven-3.9.3-bin.zip

View File

@ -1,10 +1,13 @@
package org.jetlinks.community;
import lombok.Generated;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.message.HeaderKey;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
/**
* @author wangzheng
@ -24,8 +27,14 @@ public interface PropertyConstants {
return Optional.ofNullable((T) map.get(key.getKey()));
}
@Generated
interface Key<V> extends ConfigKey<V>, HeaderKey<V> {
@Override
default Type getValueType() {
return ConfigKey.super.getValueType();
}
@Override
default Class<V> getType() {
return ConfigKey.super.getType();
@ -45,5 +54,52 @@ public interface PropertyConstants {
};
}
static <T> Key<T> of(String key, T defaultValue) {
return new Key<T>() {
@Override
public String getKey() {
return key;
}
@Override
public T getDefaultValue() {
return defaultValue;
}
};
}
static <T> Key<T> of(String key, Supplier<T> defaultValue) {
return new Key<T>() {
@Override
public String getKey() {
return key;
}
@Override
public T getDefaultValue() {
return defaultValue.get();
}
};
}
static <T> Key<T> of(String key, Supplier<T> defaultValue, Type type) {
return new Key<T>() {
@Override
public Type getValueType() {
return type;
}
@Override
public String getKey() {
return key;
}
@Override
public T getDefaultValue() {
return defaultValue.get();
}
};
}
}
}

View File

@ -98,9 +98,9 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
public void shutdown() {
super.shutdown();
Flux.fromIterable(localSessions.values())
.flatMap(Function.identity())
.filter(session -> session.isWrapFrom(PersistentSession.class))
.map(session -> session.unwrap(PersistentSession.class))
.filter(ref -> ref.loaded != null)
.filter(ref -> ref.loaded.isWrapFrom(PersistentSession.class))
.map(ref -> ref.loaded.unwrap(PersistentSession.class))
.as(this::tryPersistent)
.block();
repository.store.compactMoveChunks();
@ -138,11 +138,14 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
}
Mono<Void> resumeSession(PersistentSessionEntity entity) {
return entity
return entity
.toSession(registry.get())
.doOnNext(session -> {
log.debug("resume session[{}]", session.getDeviceId());
localSessions.putIfAbsent(session.getDeviceId(), Mono.just(session));
localSessions.putIfAbsent(session.getDeviceId(),
new DeviceSessionRef(session.getDeviceId(),
this,
session));
})
.onErrorResume((err) -> {
log.debug("resume session[{}] error", entity.getDeviceId(), err);

View File

@ -4,18 +4,20 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.scalecube.services.annotations.Service;
import io.scalecube.services.annotations.ServiceMethod;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.exception.NotFoundException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.rpc.RpcManager;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.buffer.*;
import org.springframework.http.codec.multipart.FilePart;
@ -28,12 +30,13 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.time.Duration;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.function.Function;
@Slf4j
public class ClusterFileManager implements FileManager {
private final FileProperties properties;
@ -55,8 +58,8 @@ public class ClusterFileManager implements FileManager {
}
@Override
public Mono<FileInfo> saveFile(FilePart filePart) {
return saveFile(filePart.filename(), filePart.content());
public Mono<FileInfo> saveFile(FilePart filePart, FileOption... options) {
return saveFile(filePart.filename(), filePart.content(), options);
}
private DataBuffer updateDigest(MessageDigest digest, DataBuffer dataBuffer) {
@ -66,7 +69,7 @@ public class ClusterFileManager implements FileManager {
return dataBuffer;
}
public Mono<FileInfo> doSaveFile(String name, Flux<DataBuffer> stream) {
public Mono<FileInfo> doSaveFile(String name, Flux<DataBuffer> stream, FileOption... options) {
LocalDate now = LocalDate.now();
FileInfo fileInfo = new FileInfo();
fileInfo.setId(IDGenerator.MD5.generate());
@ -85,18 +88,20 @@ public class ClusterFileManager implements FileManager {
.map(buffer -> updateDigest(md5, updateDigest(sha256, buffer)))
.as(buf -> DataBufferUtils
.write(buf, path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
StandardOpenOption.TRUNCATE_EXISTING))
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
StandardOpenOption.TRUNCATE_EXISTING))
.then(Mono.defer(() -> {
File savedFile = Paths.get(storageBasePath, storagePath).toFile();
if (!savedFile.exists()) {
return Mono.error(new BusinessException("error.file_storage_failed"));
}
fileInfo.withAccessKey(IDGenerator.MD5.generate());
fileInfo.setMd5(ByteBufUtil.hexDump(md5.digest()));
fileInfo.setSha256(ByteBufUtil.hexDump(sha256.digest()));
fileInfo.setLength(savedFile.length());
fileInfo.setCreateTime(System.currentTimeMillis());
fileInfo.setOptions(options);
FileEntity entity = FileEntity.of(fileInfo, storagePath, serverNodeId);
return repository
.insert(entity)
@ -105,8 +110,26 @@ public class ClusterFileManager implements FileManager {
}
@Override
public Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream) {
return doSaveFile(name, stream);
public Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream, FileOption... options) {
return doSaveFile(name, stream, options);
}
@Override
public Mono<FileInfo> getFileByMd5(String md5) {
return repository
.createQuery()
.where(FileEntity::getMd5, md5)
.fetchOne()
.map(FileEntity::toInfo);
}
@Override
public Mono<FileInfo> getFileBySha256(String sha256) {
return repository
.createQuery()
.where(FileEntity::getSha256, sha256)
.fetchOne()
.map(FileEntity::toInfo);
}
@Override
@ -119,9 +142,9 @@ public class ClusterFileManager implements FileManager {
private Flux<DataBuffer> readFile(String filePath, long position) {
return DataBufferUtils
.read(new FileSystemResource(Paths.get(properties.getStorageBasePath(), filePath)),
position,
bufferFactory,
(int) properties.getReadBufferSize().toBytes())
position,
bufferFactory,
(int) properties.getReadBufferSize().toBytes())
.onErrorMap(NoSuchFileException.class, e -> new NotFoundException());
}
@ -168,6 +191,27 @@ public class ClusterFileManager implements FileManager {
});
}
@Override
public Mono<Integer> delete(String id) {
return doDelete(id);
}
public Mono<Integer> doDelete(String id) {
return repository
.deleteById(id);
}
@EventListener
public void handleDeleteEvent(EntityDeletedEvent<FileEntity> event) {
for (FileEntity fileEntity : event.getEntity()) {
File file = Paths.get(properties.getStorageBasePath(), fileEntity.getStoragePath()).toFile();
if (file.exists()) {
log.debug("delete file: {}", file.getAbsolutePath());
file.delete();
}
}
}
@AllArgsConstructor
private static class DefaultReaderContext implements ReaderContext {
private final FileInfo info;

View File

@ -6,10 +6,16 @@ import org.apache.commons.io.FilenameUtils;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Getter
@Setter
public class FileInfo {
public static final String OTHER_ACCESS_KEY = "accessKey";
private String id;
private String name;
@ -28,6 +34,10 @@ public class FileInfo {
private FileOption[] options;
private Map<String, Object> others;
private String accessUrl;
public MediaType mediaType() {
if (!StringUtils.hasText(extension)) {
return MediaType.APPLICATION_OCTET_STREAM;
@ -36,6 +46,14 @@ public class FileInfo {
case "jpg":
case "jpeg":
return MediaType.IMAGE_JPEG;
case "png":
return MediaType.IMAGE_PNG;
case "gif":
return MediaType.IMAGE_GIF;
case "mp4":
return MediaType.parseMediaType("video/mp4");
case "flv":
return MediaType.parseMediaType("video/x-flv");
case "text":
case "txt":
return MediaType.TEXT_PLAIN;
@ -46,10 +64,44 @@ public class FileInfo {
}
}
public boolean hasOption(FileOption option){
if(options==null){
return false;
}
for (FileOption fileOption : options) {
if(fileOption==option){
return true;
}
}
return false;
}
public FileInfo withFileName(String fileName) {
name = fileName;
extension = FilenameUtils.getExtension(fileName);
return this;
}
public synchronized FileInfo withOther(String key, Object value) {
if (others == null) {
others = new HashMap<>();
}
others.put(key, value);
return this;
}
public FileInfo withAccessKey(String accessKey) {
withOther(OTHER_ACCESS_KEY, accessKey);
return this;
}
public Optional<String> accessKey() {
return Optional
.ofNullable(others)
.map(map -> map.get(OTHER_ACCESS_KEY))
.map(String::valueOf)
.filter(StringUtils::hasText);
}
}

View File

@ -7,23 +7,31 @@ import reactor.core.publisher.Mono;
import java.util.function.Function;
/**
* 文件管理器,统一管理文件信息
*/
public interface FileManager {
Mono<FileInfo> saveFile(FilePart filePart);
Mono<FileInfo> saveFile(FilePart filePart, FileOption... options);
Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream);
Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream, FileOption... options);
Mono<FileInfo> getFile(String id);
Mono<FileInfo> getFileByMd5(String md5);
Mono<FileInfo> getFileBySha256(String sha256);
Flux<DataBuffer> read(String id);
Flux<DataBuffer> read(String id, long position);
Flux<DataBuffer> read(String id,
Function<ReaderContext,Mono<Void>> beforeRead);
Function<ReaderContext, Mono<Void>> beforeRead);
interface ReaderContext{
Mono<Integer> delete(String id);
interface ReaderContext {
FileInfo info();
void position(long position);

View File

@ -1,14 +1,21 @@
package org.jetlinks.community.io.file.web;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.hswebframework.web.authorization.Authentication;
import org.hswebframework.web.authorization.annotation.Authorize;
import org.hswebframework.web.authorization.annotation.DeleteAction;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.exception.AccessDenyException;
import org.jetlinks.community.io.file.FileInfo;
import org.jetlinks.community.io.file.FileManager;
import org.jetlinks.community.io.file.FileOption;
import org.springframework.http.ContentDisposition;
import org.springframework.http.HttpRange;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@ -19,6 +26,8 @@ import java.util.List;
@RestController
@RequestMapping("/file")
@AllArgsConstructor
@Resource(id= "file-manager",name = "文件管理")
@Tag(name = "需身份认证的文件管理")
public class FileManagerController {
private final FileManager fileManager;
@ -31,7 +40,7 @@ public class FileManagerController {
}
@GetMapping("/{fileId}")
@Authorize(merge = false)
@Authorize(ignore = true)
@Operation(summary = "获取文件")
public Mono<Void> read(@PathVariable String fileId,
ServerWebExchange exchange) {
@ -40,28 +49,67 @@ public class FileManagerController {
.getResponse()
.writeWith(fileManager
.read(fileId, ctx -> {
List<HttpRange> ranges = exchange
.getRequest()
.getHeaders()
.getRange();
long position = 0;
if (ranges.size() != 0) {
position = ranges.get(0).getRangeStart(ctx.info().getLength());
Mono<Void> before;
//不是公开访问则需要登陆或者使用accessKey
if (!ctx.info().hasOption(FileOption.publicAccess)) {
String key = exchange.getRequest().getQueryParams().getFirst("accessKey");
//请求参数没有accessKey则校验当前用户是否登陆
if (!StringUtils.hasText(key)) {
before = Authentication
.currentReactive()
.switchIfEmpty(Mono.error(AccessDenyException::new))
.then();
} else {
//校验accessKey
if (ctx.info().accessKey().map(key::equalsIgnoreCase).orElse(false)) {
before = Mono.empty();
} else {
before = Mono.error(AccessDenyException::new);
}
}
} else {
before = Mono.empty();
}
ctx.position(position);
MediaType mediaType = ctx.info().mediaType();
exchange.getResponse().getHeaders().setContentType(mediaType);
exchange.getResponse().getHeaders().setContentLength(ctx.info().getLength());
//文件流时下载文件
if (mediaType.includes(MediaType.APPLICATION_OCTET_STREAM)) {
exchange.getResponse().getHeaders().setContentDisposition(
ContentDisposition
.builder("attachment")
.filename(ctx.info().getName(), StandardCharsets.UTF_8)
.build()
);
}
return Mono.empty();
return before.then(
Mono.fromRunnable(() -> {
List<HttpRange> ranges = exchange
.getRequest()
.getHeaders()
.getRange();
long position = 0;
if (ranges.size() != 0) {
position = ranges.get(0).getRangeStart(ctx.info().getLength());
}
ctx.position(position);
MediaType mediaType = ctx.info().mediaType();
exchange.getResponse().getHeaders().setContentType(mediaType);
exchange.getResponse().getHeaders().setContentLength(ctx.info().getLength());
exchange.getResponse().getHeaders().add("file-md5", ctx.info().getMd5());
exchange.getResponse().getHeaders().add("file-sha256", ctx.info().getSha256());
//文件流时下载文件
if (mediaType.includes(MediaType.APPLICATION_OCTET_STREAM)) {
exchange.getResponse().getHeaders().setContentDisposition(
ContentDisposition
.builder("attachment")
.filename(ctx.info().getName(), StandardCharsets.UTF_8)
.build()
);
}
})
);
}));
}
@DeleteMapping("/{fileId}")
@DeleteAction
@Operation(summary = "删除文件")
public Mono<Integer> delete(@PathVariable String fileId) {
return fileManager
.delete(fileId);
}
}

View File

@ -13,6 +13,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.community.utils.SystemUtils;
@ -112,27 +113,25 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
monitor.rejected();
}
return isStarted();
return true;
})
.publishOn(Schedulers.parallel())
//处理mqtt连接请求
.flatMap(this::handleConnection)
//处理认证结果
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
.contextWrite(ReactiveLogger.start("network", mqttServer.getId()))
.flatMap(connection -> this
.handleConnection(connection)
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()))
.onErrorResume(err -> {
log.error(err.getMessage(), err);
return Mono.empty();
}),
Integer.MAX_VALUE)
.subscribe();
}
//处理连接并进行认证
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
//内存不够了
if (SystemUtils.memoryIsOutOfWatermark()) {
//直接拒绝,响应SERVER_UNAVAILABLE,不再处理此连接
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return Mono.empty();
}
return Mono
.justOrEmpty(connection.getAuth())
.flatMap(auth -> {
@ -170,7 +169,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
//应答SERVER_UNAVAILABLE
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}))
.subscribeOn(Schedulers.parallel());
;
}
//处理认证结果
@ -190,7 +189,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
monitor.totalConnection(counter.sum());
sessionManager
.getSession(deviceId)
.getSession(deviceId, false)
.flatMap(_tmp -> {
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
@ -219,19 +218,21 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
})
.defaultIfEmpty(newSession);
})
.flatMap(session -> Mono.fromCallable(() -> {
.mapNotNull(session->{
try {
return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
} catch (IllegalStateException ignore) {
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
return null;
}
}))
})
.doOnNext(o -> {
//监控信息
monitor.connected();
monitor.totalConnection(counter.sum());
});
})
//会话empty说明注册会话失败?
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
} else {
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
@ -253,20 +254,16 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
DeviceOperator operator,
MqttConnectionSession session) {
return Flux
.usingWhen(Mono.just(connection),
MqttConnection::handleMessage,
MqttConnection::close)
//网关暂停或者已停止时,则不处理消息
.filter(pb -> isStarted())
.doOnNext(msg -> monitor.receivedMessage())
.publishOn(Schedulers.parallel())
//解码收到的mqtt报文
.flatMap(publishing -> this
.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
//应答MQTT(QoS1,2的场景)
.doOnSuccess(s -> publishing.acknowledge())
.concatMap(publishing -> this
.decodeAndHandleMessage(operator, session, publishing, connection)
)
//合并遗言消息
.mergeWith(
@ -282,30 +279,32 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
MqttConnectionSession session,
MqttMessage message,
MqttConnection connection) {
monitor.receivedMessage();
return operator
.getProtocol()
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
//解码
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message, registry)))
.cast(DeviceMessage.class)
.flatMap(msg -> {
.concatMap(msg -> {
//回填deviceId,有的场景协议包不能或者没有解析出deviceId,则直接使用连接对应的设备id进行填充.
if (!StringUtils.hasText(msg.getDeviceId())) {
msg.thingId(DeviceThingType.device, operator.getDeviceId());
}
return this
.handleMessage(operator, msg, connection);
return this.handleMessage(operator, msg, connection);
})
.doOnComplete(() -> {
if (message instanceof MqttPublishing) {
((MqttPublishing) message).acknowledge();
}
})
.doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
.as(FluxTracer
.create(DeviceTracer.SpanName.decode(operator.getDeviceId()),
(span, msg) -> span.setAttribute(DeviceTracer.SpanKey.message, msg
.toJson()
.toJSONString())))
//发生错误不中断流
.onErrorResume((err) -> Mono.empty())
.then()
.subscribeOn(Schedulers.parallel());
.onErrorResume((err) -> {
log.error("handle mqtt message [{}] error:{}", operator.getDeviceId(), message, err);
return Mono.empty();
})
.then();
}
private Mono<DeviceMessage> handleMessage(DeviceOperator mainDevice,

View File

@ -23,6 +23,7 @@ import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.server.mqtt.MqttAuth;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.*;
import javax.annotation.Nonnull;
@ -41,7 +42,7 @@ class VertxMqttConnection implements MqttConnection {
private long keepAliveTimeoutMs;
@Getter
private long lastPingTime = System.currentTimeMillis();
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false;
private static final MqttAuth emptyAuth = new MqttAuth() {
@Override
public String getUsername() {
@ -53,19 +54,9 @@ class VertxMqttConnection implements MqttConnection {
return "";
}
};
private final Sinks.Many<MqttPublishing> messageProcessor = Sinks
.many()
.multicast()
.onBackpressureBuffer(Integer.MAX_VALUE);
private final Sinks.Many<MqttSubscription> subscription = Sinks
.many()
.multicast()
.onBackpressureBuffer(Integer.MAX_VALUE);
private final Sinks.Many<MqttUnSubscription> unsubscription = Sinks
.many()
.multicast()
.onBackpressureBuffer(Integer.MAX_VALUE);
private final Sinks.Many<MqttPublishing> messageProcessor = Reactors.createMany(Integer.MAX_VALUE, false);
private final Sinks.Many<MqttSubscription> subscription = Reactors.createMany(Integer.MAX_VALUE, false);
private final Sinks.Many<MqttUnSubscription> unsubscription = Reactors.createMany(Integer.MAX_VALUE, false);
public VertxMqttConnection(MqttEndpoint endpoint) {
@ -178,7 +169,7 @@ class VertxMqttConnection implements MqttConnection {
publishing.acknowledge();
}
if (hasDownstream) {
this.messageProcessor.tryEmitNext(publishing);
this.messageProcessor.emitNext(publishing, Reactors.emitFailureHandler());
}
})
//QoS 1 PUBACK
@ -211,7 +202,7 @@ class VertxMqttConnection implements MqttConnection {
subscription.acknowledge();
}
if (hasDownstream) {
this.subscription.tryEmitNext(subscription);
this.subscription.emitNext(subscription, Reactors.emitFailureHandler());
}
})
.unsubscribeHandler(msg -> {
@ -222,7 +213,7 @@ class VertxMqttConnection implements MqttConnection {
unSubscription.acknowledge();
}
if (hasDownstream) {
this.unsubscription.tryEmitNext(unSubscription);
this.unsubscription.emitNext(unSubscription, Reactors.emitFailureHandler());
}
});
}

View File

@ -16,6 +16,7 @@ import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -121,22 +122,27 @@ public class DeviceGatewayHelper {
.createOrUpdateSession(childrenId,
children,
child -> Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child)),
Mono::empty));
Mono::empty)
.doOnNext(session -> {
if (session.isWrapFrom(ChildrenDeviceSession.class)) {
ChildrenDeviceSession childrenSession = session.unwrap(ChildrenDeviceSession.class);
//网关发生变化,替换新的上级会话
if (!Objects.equals(deviceId, childrenSession.getParent().getDeviceId())) {
childrenSession.replaceWith(parentSession);
}
}
}));
//子设备注册
if (isDoRegister(children)) {
return Mono
//延迟2秒因为自动注册是异步的,收到消息后并不能保证马上可以注册成功.
.delay(Duration.ofSeconds(2))
.then(registry
.getDevice(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals))
.flatMap(ignore -> sessionHandler))
return this
.getDeviceForRegister(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals))
.flatMap(ignore -> sessionHandler)
.then();
}
return sessionHandler.then();
@ -207,7 +213,7 @@ public class DeviceGatewayHelper {
}
if (doHandle) {
then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt));
then = messageHandler.handleMessage(null, message).then(then);
}
return this
@ -260,6 +266,15 @@ public class DeviceGatewayHelper {
.flatMap(Function.identity());
}
private Mono<DeviceOperator> getDeviceForRegister(String deviceId) {
return registry
.getDevice(deviceId)
.switchIfEmpty(Mono.defer(() -> Mono
//延迟2秒因为自动注册是异步的,收到消息后并不能保证马上可以注册成功.
.delay(Duration.ofSeconds(2))
.then(registry.getDevice(deviceId))));
}
private Mono<DeviceSession> createNewSession(String deviceId,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
@ -298,7 +313,7 @@ public class DeviceGatewayHelper {
private Mono<DeviceSession> updateSession0(DeviceSession session,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) {
Mono<Void> after = null;
Mono<DeviceSession> after = null;
//消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话
if (isNewKeeOnline(session, message)) {
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
@ -310,14 +325,13 @@ public class DeviceGatewayHelper {
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
after = sessionBuilder
.apply(session.getOperator())
.map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)))
.then();
.map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)));
}
applySessionKeepaliveTimeout(message, session);
session.keepAlive();
return after == null
? Mono.just(session)
: after.thenReturn(session);
: after;
}
private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) {
@ -348,7 +362,11 @@ public class DeviceGatewayHelper {
//判断保持在线的会话是否以及丢失(服务重启后可能出现)
private static boolean isKeeOnlineLost(DeviceSession session) {
return session instanceof KeepOnlineSession && session.isWrapFrom(LostDeviceSession.class);
if (!session.isWrapFrom(KeepOnlineSession.class)) {
return false;
}
return session.isWrapFrom(LostDeviceSession.class)
|| !session.unwrap(KeepOnlineSession.class).getParent().isAlive();
}
//判断是否为设备注册

View File

@ -1,5 +1,7 @@
package org.jetlinks.community.network.tcp.client;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
@ -8,15 +10,15 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.message.codec.EncodedMessage;
import reactor.core.publisher.EmitterProcessor;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import java.net.InetSocketAddress;
import java.net.SocketException;
@ -24,28 +26,29 @@ import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
@Slf4j
public class VertxTcpClient implements TcpClient {
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Getter
private final String id;
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final boolean serverClient;
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Setter
private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
private volatile long lastKeepAliveTime = System.currentTimeMillis();
public VertxTcpClient(String id, boolean serverClient) {
this.id = id;
this.serverClient = serverClient;
}
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final Sinks.Many<TcpMessage> sink = Reactors.createMany();
private final boolean serverClient;
@Override
public void keepAlive() {
@ -72,15 +75,18 @@ public class VertxTcpClient implements TcpClient {
@Override
public Mono<Void> sendMessage(EncodedMessage message) {
return Mono
.create((sink) -> {
.<Void>create((sink) -> {
if (socket == null) {
sink.error(new SocketException("socket closed"));
return;
}
Buffer buffer = Buffer.buffer(message.getPayload());
ByteBuf buf = message.getPayload();
Buffer buffer = Buffer.buffer(buf);
int len = buffer.length();
socket.write(buffer, r -> {
keepAlive();
ReferenceCountUtil.safeRelease(buf);
if (r.succeeded()) {
keepAlive();
sink.success();
} else {
sink.error(r.cause());
@ -111,23 +117,18 @@ public class VertxTcpClient implements TcpClient {
return true;
}
/**
* 接收TCP消息
*
* @param message TCP消息
*/
public VertxTcpClient(String id,boolean serverClient) {
this.id = id;
this.serverClient = serverClient;
}
protected void received(TcpMessage message) {
if (processor.getPending() > processor.getBufferSize() / 2) {
log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString());
return;
}
sink.next(message);
sink.emitNext(message,Reactors.RETRY_NON_SERIALIZED);
}
@Override
public Flux<TcpMessage> subscribe() {
return processor
.map(Function.identity());
return sink.asFlux();
}
private void execute(Runnable runnable) {
@ -144,7 +145,7 @@ public class VertxTcpClient implements TcpClient {
return null;
}
SocketAddress socketAddress = socket.remoteAddress();
return new InetSocketAddress(socketAddress.host(), socketAddress.port());
return InetSocketAddress.createUnresolved(socketAddress.host(), socketAddress.port());
}
@Override
@ -154,6 +155,9 @@ public class VertxTcpClient implements TcpClient {
@Override
public void shutdown() {
if (socket == null) {
return;
}
log.debug("tcp client [{}] disconnect", getId());
synchronized (this) {
if (null != client) {
@ -174,7 +178,7 @@ public class VertxTcpClient implements TcpClient {
}
disconnectListener.clear();
if (serverClient) {
processor.onComplete();
sink.tryEmitComplete();
}
}
@ -186,11 +190,6 @@ public class VertxTcpClient implements TcpClient {
this.client = client;
}
/**
* 设置客户端消息解析器
*
* @param payloadParser 消息解析器
*/
public void setRecordParser(PayloadParser payloadParser) {
synchronized (this) {
if (null != this.payloadParser && this.payloadParser != payloadParser) {
@ -199,18 +198,10 @@ public class VertxTcpClient implements TcpClient {
this.payloadParser = payloadParser;
this.payloadParser
.handlePayload()
.onErrorContinue((err, res) -> {
log.error(err.getMessage(), err);
})
.subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
}
}
/**
* socket处理
*
* @param socket socket
*/
public void setSocket(NetSocket socket) {
synchronized (this) {
Objects.requireNonNull(payloadParser);
@ -222,12 +213,12 @@ public class VertxTcpClient implements TcpClient {
.handler(buffer -> {
if (log.isDebugEnabled()) {
log.debug("handle tcp client[{}] payload:[{}]",
socket.remoteAddress(),
Hex.encodeHexString(buffer.getBytes()));
socket.remoteAddress(),
Hex.encodeHexString(buffer.getBytes()));
}
keepAlive();
payloadParser.handle(buffer);
if (this.socket != socket) {
if (this.socket != null && this.socket != socket) {
log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
socket.close();
}

View File

@ -61,7 +61,7 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
netClient.connect(properties.getPort(), properties.getHost(), result -> {
if (result.succeeded()) {
log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties));
client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties).get());
client.setSocket(result.result());
} else {
log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause());

View File

@ -1,5 +1,6 @@
package org.jetlinks.community.network.tcp.device;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
@ -10,6 +11,7 @@ import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceOperator;
@ -52,6 +54,12 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
private Disposable disposable;
//连接检查超时时间,超过时间连接没有被正确处理返回会话,将被自动断开连接
@Setter
private Duration connectCheckTimeout = TimeUtils.parse(
System.getProperty("gateway.tcp.network.connect-check-timeout", "10s"));
private final DeviceGatewayHelper helper;
public TcpServerDeviceGateway(String id,
@ -95,6 +103,7 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
final InetSocketAddress address;
Disposable legalityChecker;
TcpConnection(TcpClient client) {
this.client = client;
@ -111,6 +120,20 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
});
monitor.connected();
sessionRef.set(new UnknownTcpDeviceSession(client.getId(), client, getTransport()));
legalityChecker = Schedulers
.parallel()
.schedule(this::checkLegality, connectCheckTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
public void checkLegality() {
//超过时间还未获取到任何设备则认为连接不合法自动断开连接
if ((sessionRef.get() instanceof UnknownTcpDeviceSession)) {
log.info("tcp [{}] connection is illegal, close it.", address);
try {
client.disconnect();
} catch (Throwable ignore) {
}
}
}
Mono<Void> accept() {
@ -157,16 +180,22 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
}
Mono<DeviceMessage> handleDeviceMessage(DeviceMessage message) {
Disposable checker = legalityChecker;
if (checker != null) {
checker.dispose();
legalityChecker = null;
}
monitor.receivedMessage();
return helper
.handleDeviceMessage(message,
device -> new TcpDeviceSession(device, client, getTransport(), monitor),
session -> {
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
deviceSession.setClient(client);
sessionRef.set(deviceSession);
},
() -> log.warn("TCP{}: The device[{}] in the message body does not exist:{}", address, message.getDeviceId(), message)
.handleDeviceMessage(
message,
device -> new TcpDeviceSession(device, client, getTransport(), monitor),
session -> {
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
deviceSession.setClient(client);
sessionRef.set(deviceSession);
},
() -> log.warn("TCP{}: The device[{}] in the message body does not exist:{}", address, message.getDeviceId(), message)
)
.thenReturn(message);
}

View File

@ -9,14 +9,16 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
@Component
public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPostProcessor {
private Map<PayloadParserType, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
private final Map<PayloadParserType, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
public DefaultPayloadParserBuilder(){
register(new FixLengthPayloadParserBuilder());
@ -25,9 +27,10 @@ public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPo
register(new DirectPayloadParserBuilder());
}
@Override
public PayloadParser build(PayloadParserType type, ValueObject configuration) {
public Supplier<PayloadParser> build(PayloadParserType type, ValueObject configuration) {
return Optional.ofNullable(strategyMap.get(type))
.map(builder -> builder.build(configuration))
.map(builder -> builder.buildLazy(configuration))
.orElseThrow(() -> new UnsupportedOperationException("unsupported parser:" + type));
}
@ -36,7 +39,7 @@ public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPo
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
public Object postProcessAfterInitialization(@Nonnull Object bean,@Nonnull String beanName) throws BeansException {
if (bean instanceof PayloadParserBuilderStrategy) {
register(((PayloadParserBuilderStrategy) bean));
}

View File

@ -1,28 +1,32 @@
package org.jetlinks.community.network.tcp.parser;
import io.vertx.core.buffer.Buffer;
import reactor.core.publisher.EmitterProcessor;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.function.Function;
/**
* 不处理直接返回数据包
*
* @author zhouhao
* @since 1.0
*/
public class DirectRecordParser implements PayloadParser {
EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
private final Sinks.Many<Buffer> sink = Reactors.createMany();
@Override
public void handle(Buffer buffer) {
processor.onNext(buffer);
sink.emitNext(buffer, Reactors.emitFailureHandler());
}
@Override
public Flux<Buffer> handlePayload() {
return processor.map(Function.identity());
return sink.asFlux();
}
@Override
public void close() {
processor.onComplete();
sink.emitComplete(Reactors.emitFailureHandler());
}
}

View File

@ -7,9 +7,9 @@ import reactor.core.publisher.Flux;
* 用于处理TCP粘拆包的解析器,通常一个客户端对应一个解析器.
*
* @author zhouhao
* @see org.jetlinks.community.network.tcp.parser.strateies.PipePayloadParser
* @see org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder
* @see org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder
* @see PipePayloadParser
* @see FixLengthPayloadParserBuilder
* @see DelimitedPayloadParserBuilder
* @since 1.0
*/
public interface PayloadParser {

View File

@ -2,8 +2,23 @@ package org.jetlinks.community.network.tcp.parser;
import org.jetlinks.community.ValueObject;
import java.util.function.Supplier;
/**
* 解析器构造器用于根据解析器类型和配置信息构造对应的解析器
*
* @author zhouhao
* @since 1.0
*/
public interface PayloadParserBuilder {
PayloadParser build(PayloadParserType type, ValueObject configuration);
/**
* 构造解析器
*
* @param type 解析器类型
* @param configuration 配置信息
* @return 解析器
*/
Supplier<PayloadParser> build(PayloadParserType type, ValueObject configuration);
}

View File

@ -1,9 +1,36 @@
package org.jetlinks.community.network.tcp.parser;
import org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder;
import org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder;
import org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder;
import org.jetlinks.community.ValueObject;
import java.util.function.Supplier;
/**
* 解析器构造器策略用于实现不同类型的解析器构造逻辑
*
* @author zhouhao
* @since 1.0
* @see FixLengthPayloadParserBuilder
* @see DelimitedPayloadParserBuilder
* @see ScriptPayloadParserBuilder
*/
public interface PayloadParserBuilderStrategy {
/**
* @return 解析器类型
*/
PayloadParserType getType();
PayloadParser build(ValueObject config);
/**
* 构造解析器
*
* @param config 配置信息
* @return 解析器
*/
Supplier<PayloadParser> buildLazy(ValueObject config);
default PayloadParser build(ValueObject config){
return buildLazy(config).get();
}
}

View File

@ -5,6 +5,7 @@ import lombok.Getter;
import org.hswebframework.web.dict.Dict;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.community.network.tcp.parser.strateies.PipePayloadParser;
import org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder;
@Getter
@AllArgsConstructor
@ -18,7 +19,7 @@ public enum PayloadParserType implements EnumDict<String> {
DELIMITED("分隔符"),
/**
* @see org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder
* @see ScriptPayloadParserBuilder
* @see PipePayloadParser
*/
SCRIPT("自定义脚本")

View File

@ -1,10 +1,21 @@
package org.jetlinks.community.network.tcp.parser.strateies;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.text.StringEscapeUtils;
import org.jetlinks.community.ValueObject;
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import org.jetlinks.community.ValueObject;
import java.util.function.Supplier;
/**
* 以分隔符读取数据包
*
* @author zhouhao
* @since 1.0
*/
public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {
@Override
public PayloadParserType getType() {
@ -12,12 +23,22 @@ public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {
}
@Override
protected RecordParser createParser(ValueObject config) {
@SneakyThrows
protected Supplier<RecordParser> createParser(ValueObject config) {
return RecordParser.newDelimited(StringEscapeUtils.unescapeJava(
config
.getString("delimited")
.orElseThrow(() -> new IllegalArgumentException("delimited can not be null"))));
String delimited = config
.getString("delimited")
.map(String::trim)
.orElseThrow(() -> new IllegalArgumentException("delimited can not be null"));
if (delimited.startsWith("0x")) {
byte[] hex = Hex.decodeHex(delimited.substring(2));
return () -> RecordParser
.newDelimited(Buffer.buffer(hex));
}
return () -> RecordParser.newDelimited(StringEscapeUtils.unescapeJava(delimited));
}

View File

@ -1,11 +1,13 @@
package org.jetlinks.community.network.tcp.parser.strateies;
import lombok.SneakyThrows;
import org.jetlinks.community.ValueObject;
import org.jetlinks.community.network.tcp.parser.DirectRecordParser;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import org.jetlinks.community.ValueObject;
import java.util.function.Supplier;
public class DirectPayloadParserBuilder implements PayloadParserBuilderStrategy {
@ -16,7 +18,7 @@ public class DirectPayloadParserBuilder implements PayloadParserBuilderStrategy
@Override
@SneakyThrows
public PayloadParser build(ValueObject config) {
return new DirectRecordParser();
public Supplier<PayloadParser> buildLazy(ValueObject config) {
return DirectRecordParser::new;
}
}

View File

@ -1,9 +1,17 @@
package org.jetlinks.community.network.tcp.parser.strateies;
import io.vertx.core.parsetools.RecordParser;
import org.jetlinks.community.ValueObject;
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import org.jetlinks.community.ValueObject;
import java.util.function.Supplier;
/**
* 固定长度解析器构造器,每次读取固定长度的数据包
*
* @author zhouhao
* @since 1.0
*/
public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {
@Override
public PayloadParserType getType() {
@ -11,9 +19,11 @@ public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {
}
@Override
protected RecordParser createParser(ValueObject config) {
return RecordParser.newFixed(config.getInt("size")
.orElseThrow(() -> new IllegalArgumentException("size can not be null")));
protected Supplier<RecordParser> createParser(ValueObject config) {
int size = config.getInt("size")
.orElseThrow(() -> new IllegalArgumentException("size can not be null"));
return () -> RecordParser.newFixed(size);
}

View File

@ -4,13 +4,14 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import reactor.core.publisher.EmitterProcessor;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@ -28,11 +29,12 @@ import java.util.function.Function;
@Slf4j
public class PipePayloadParser implements PayloadParser {
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
private final static AtomicIntegerFieldUpdater<PipePayloadParser> CURRENT_PIPE =
AtomicIntegerFieldUpdater.newUpdater(PipePayloadParser.class, "currentPipe");
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final Sinks.Many<Buffer> sink = Reactors.createMany();
private final List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
private final List<BiConsumer<Buffer, PipePayloadParser>> pipe = new CopyOnWriteArrayList<>();
private final List<Buffer> result = new CopyOnWriteArrayList<>();
@ -42,7 +44,7 @@ public class PipePayloadParser implements PayloadParser {
private Consumer<RecordParser> firstInit;
private final AtomicInteger currentPipe = new AtomicInteger();
private volatile int currentPipe;
public Buffer newBuffer() {
return Buffer.buffer();
@ -57,10 +59,12 @@ public class PipePayloadParser implements PayloadParser {
}
public PipePayloadParser handler(Consumer<Buffer> handler) {
pipe.add(handler);
pipe.add((buffer, parser) -> handler.accept(buffer));
return this;
}
public PipePayloadParser delimited(String delimited) {
if (recordParser == null) {
setParser(RecordParser.newDelimited(delimited));
@ -90,22 +94,22 @@ public class PipePayloadParser implements PayloadParser {
return this;
}
private Consumer<Buffer> getNextHandler() {
int i = currentPipe.getAndIncrement();
private BiConsumer<Buffer, PipePayloadParser> getNextHandler() {
int i = CURRENT_PIPE.getAndIncrement(this);
if (i < pipe.size()) {
return pipe.get(i);
}
currentPipe.set(0);
CURRENT_PIPE.set(this, 0);
return pipe.get(0);
}
private void setParser(RecordParser parser) {
this.recordParser = parser;
this.recordParser.handler(buffer -> getNextHandler().accept(buffer));
this.recordParser.handler(buffer -> getNextHandler().accept(buffer, this));
}
public PipePayloadParser complete() {
currentPipe.set(0);
CURRENT_PIPE.set(this, 0);
if (recordParser != null) {
firstInit.accept(recordParser);
}
@ -115,7 +119,7 @@ public class PipePayloadParser implements PayloadParser {
buffer.appendBuffer(buf);
}
this.result.clear();
sink.next(buffer);
sink.emitNext(buffer, Reactors.emitFailureHandler());
}
return this;
@ -138,13 +142,13 @@ public class PipePayloadParser implements PayloadParser {
}
Buffer buf = directMapper.apply(buffer);
if (null != buf) {
sink.next(buf);
sink.emitNext(buf, Reactors.emitFailureHandler());
}
}
@Override
public Flux<Buffer> handlePayload() {
return processor.map(Function.identity());
return sink.asFlux();
}
@Override
@ -155,8 +159,8 @@ public class PipePayloadParser implements PayloadParser {
@Override
public void close() {
processor.onComplete();
currentPipe.set(0);
sink.tryEmitComplete();
CURRENT_PIPE.set(this, 0);
this.result.clear();
}

View File

@ -1,16 +1,17 @@
package org.jetlinks.community.network.tcp.parser.strateies;
import lombok.SneakyThrows;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.expands.script.engine.DynamicScriptEngine;
import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
import org.jetlinks.community.ValueObject;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import org.jetlinks.community.ValueObject;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy {
@Override
@ -20,24 +21,33 @@ public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy
@Override
@SneakyThrows
public PayloadParser build(ValueObject config) {
public Supplier<PayloadParser> buildLazy(ValueObject config) {
String script = config.getString("script")
.orElseThrow(() -> new IllegalArgumentException("script不能为空"));
.orElseThrow(() -> new IllegalArgumentException("script不能为空"));
String lang = config.getString("lang")
.orElseThrow(() -> new IllegalArgumentException("lang不能为空"));
.orElse("js");
DynamicScriptEngine engine = DynamicScriptEngineFactory.getEngine(lang);
if (engine == null) {
throw new IllegalArgumentException("不支持的脚本:" + lang);
}
PipePayloadParser parser = new PipePayloadParser();
String id = DigestUtils.md5Hex(script);
if (!engine.compiled(id)) {
engine.compile(id, script);
}
Map<String, Object> ctx = new HashMap<>();
ctx.put("parser", parser);
engine.execute(id, ctx).getIfSuccess();
return parser;
return () -> {
PipePayloadParser parser = new PipePayloadParser();
Map<String, Object> ctx = new HashMap<>();
ctx.put("parser", parser);
try {
engine.execute(id, ctx).getIfSuccess();
} catch (Throwable e) {
throw new RuntimeException(e);
}
return parser;
};
}
}

View File

@ -2,32 +2,31 @@ package org.jetlinks.community.network.tcp.parser.strateies;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import org.jetlinks.community.ValueObject;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import reactor.core.publisher.EmitterProcessor;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.ValueObject;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;
import java.util.function.Function;
import java.util.function.Supplier;
public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderStrategy {
@Override
public abstract PayloadParserType getType();
protected abstract RecordParser createParser(ValueObject config);
protected abstract Supplier<RecordParser> createParser(ValueObject config);
@Override
public PayloadParser build(ValueObject config) {
return new RecordPayloadParser(() -> createParser(config));
public Supplier<PayloadParser> buildLazy(ValueObject config) {
Supplier<RecordParser> parser = createParser(config);
return () -> new RecordPayloadParser(parser);
}
static class RecordPayloadParser implements PayloadParser {
private final Supplier<RecordParser> recordParserSupplier;
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final Sinks.Many<Buffer> sink = Reactors.createMany();
private RecordParser recordParser;
@ -43,18 +42,20 @@ public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderS
@Override
public Flux<Buffer> handlePayload() {
return processor.map(Function.identity());
return sink.asFlux();
}
@Override
public void close() {
processor.onComplete();
sink.emitComplete(Reactors.emitFailureHandler());
}
@Override
public void reset() {
this.recordParser = recordParserSupplier.get();
this.recordParser.handler(sink::next);
this.recordParser.handler(payload -> {
sink.emitNext(payload, Reactors.emitFailureHandler());
});
}
}

View File

@ -70,7 +70,7 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
}
// 根据解析类型配置数据解析器
payloadParserBuilder.build(properties.getParserType(), properties);
tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), properties));
tcpServer.setParserSupplier( payloadParserBuilder.build(properties.getParserType(), properties));
tcpServer.setServer(instances);
tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
// 针对JVM做的多路复用优化

View File

@ -10,9 +10,11 @@ import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.client.VertxTcpClient;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;
import java.time.Duration;
import java.util.Collection;
@ -28,8 +30,8 @@ public class VertxTcpServer implements TcpServer {
@Getter
private final String id;
private final EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
private final FluxSink<TcpClient> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final Sinks.Many<TcpClient> sink = Reactors.createMany(Integer.MAX_VALUE, false);
Collection<NetServer> tcpServers;
private Supplier<PayloadParser> parserSupplier;
@Setter
@ -41,8 +43,7 @@ public class VertxTcpServer implements TcpServer {
@Override
public Flux<TcpClient> handleConnection() {
return processor
.map(Function.identity());
return sink.asFlux();
}
private void execute(Runnable runnable) {
@ -80,7 +81,7 @@ public class VertxTcpServer implements TcpServer {
* @param socket socket
*/
protected void acceptTcpConnection(NetSocket socket) {
if (!processor.hasDownstreams()) {
if (sink.currentSubscriberCount() == 0) {
log.warn("not handler for tcp client[{}]", socket.remoteAddress());
socket.close();
return;
@ -101,8 +102,7 @@ public class VertxTcpServer implements TcpServer {
// 调用坐标 org.jetlinks.community.network.tcp.server.TcpServerProvider.initTcpServer
client.setRecordParser(parserSupplier.get());
client.setSocket(socket);
// client放进了发射器
sink.next(client);
sink.emitNext(client, Reactors.emitFailureHandler());
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
} catch (Exception e) {
log.error("create tcp server client error", e);
@ -118,6 +118,7 @@ public class VertxTcpServer implements TcpServer {
@Override
public void shutdown() {
if (null != tcpServers) {
log.debug("close tcp server :[{}]", id);
for (NetServer tcpServer : tcpServers) {
execute(tcpServer::close);
}

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.standalone.web;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Getter
@Setter
@ConfigurationProperties(prefix = "api")
@Component
public class ApiInfoProperties {
@Schema(description = "api根路径")
private String basePath;
@Schema(description = "api地址信息")
private Map<String, String> urls = new ConcurrentHashMap<>();
}

View File

@ -1,20 +1,54 @@
package org.jetlinks.community.standalone.web;
import org.hswebframework.web.authorization.annotation.Authorize;
import org.jetlinks.community.Version;
import lombok.Getter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@RequestMapping("/system")
@RestController
public class SystemInfoController {
@GetMapping("/version")
@Authorize(ignore = true)
public Mono<Version> getVersion() {
return Mono.just(Version.current);
private static final Map<String, Object> versionResponse;
private final ApiInfoProperties infoProperties;
static {
versionResponse = new HashMap<>();
versionResponse.put("result", new Version());
versionResponse.put("status", 200);
versionResponse.put("code", "success");
}
public SystemInfoController(ApiInfoProperties infoProperties) {
this.infoProperties = infoProperties;
}
@GetMapping("/version")
public Mono<Map<String, Object>> getVersion() {
return Mono.just(versionResponse);
}
@GetMapping("/apis")
public Mono<Map<String, Object>> getApis() {
Map<String, Object> map = new HashMap<>();
map.put("result", infoProperties);
map.put("status", 200);
map.put("code", "success");
return Mono.just(map);
}
@Getter
public static class Version {
private final String edition = "pro";
private final String version = "1.1.0-SNAPSHOT";
private final String mode = "cloud";
}
}

View File

@ -18,10 +18,10 @@ spring:
elasticsearch:
client:
reactive:
endpoints: ${elasticsearch.client.host}:${elasticsearch.client.port}
endpoints: localhost:9201
max-in-memory-size: 100MB
socket-timeout: ${elasticsearch.client.socket-timeout}
connection-timeout: ${elasticsearch.client.socket-timeout}
socket-timeout: 5000
connection-timeout: 8000
easyorm:
default-schema: PUBLIC # 数据库默认的schema
dialect: h2 #数据库方言
@ -31,13 +31,6 @@ elasticsearch:
data-path: ./data/elasticsearch
port: 9201
host: 0.0.0.0
client:
host: localhost
port: 9201
max-conn-total: 128
connect-timeout: 5000
socket-timeout: 5000
connection-request-timeout: 8000
index:
default-strategy: time-by-month #默认es的索引按月进行分表, direct则为直接操作索引.
settings:

View File

@ -109,6 +109,9 @@ hsweb:
file:
manager:
storage-base-path: ./data/files
api:
# 访问系统接口的根地址
base-path: http://127.0.0.1:${server.port}
jetlinks:
server-id: ${spring.application.name}:${server.port} #设备服务网关服务ID,不同服务请设置不同的ID

12
pom.xml
View File

@ -19,10 +19,10 @@
<spring.boot.version>2.5.13</spring.boot.version>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<hsweb.framework.version>4.0.15-SNAPSHOT</hsweb.framework.version>
<easyorm.version>4.1.0-SNAPSHOT</easyorm.version>
<hsweb.framework.version>4.0.15</hsweb.framework.version>
<easyorm.version>4.1.0</easyorm.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<jetlinks.version>1.2.0-SNAPSHOT</jetlinks.version>
<jetlinks.version>1.2.0</jetlinks.version>
<r2dbc.version>Borca-SR1</r2dbc.version>
<netty.version>4.1.74.Final</netty.version>
<elasticsearch.version>7.11.2</elasticsearch.version>
@ -521,7 +521,7 @@
<repository>
<id>hsweb-nexus</id>
<name>Nexus Release Repository</name>
<url>https://nexus.hsweb.me/content/groups/public/</url>
<url>https://nexus.jetlinks.cn/content/groups/public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
@ -534,12 +534,12 @@
<repository>
<id>releases</id>
<name>Nexus Release Repository</name>
<url>https://nexus.hsweb.me/content/repositories/releases/</url>
<url>https://nexus.jetlinks.cn/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>https://nexus.hsweb.me/content/repositories/snapshots/</url>
<url>https://nexus.jetlinks.cn/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>