Compare commits
14 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
477ecd911a | |
|
|
29174496b6 | |
|
|
9711f857d2 | |
|
|
6c29d45d67 | |
|
|
c78e3795a9 | |
|
|
f3a5fb2bff | |
|
|
101aa5bfa7 | |
|
|
f524443ba3 | |
|
|
44f187d5db | |
|
|
393930e72b | |
|
|
7075c29740 | |
|
|
7a0ddf7aae | |
|
|
9c7a7a5afb | |
|
|
b459f66925 |
|
|
@ -1 +1 @@
|
|||
distributionUrl=https://downloads.apache.org/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip
|
||||
distributionUrl=https://archive.apache.org/dist/maven/maven-3/3.9.3/binaries/apache-maven-3.9.3-bin.zip
|
||||
|
|
@ -1,10 +1,13 @@
|
|||
package org.jetlinks.community;
|
||||
|
||||
import lombok.Generated;
|
||||
import org.jetlinks.core.config.ConfigKey;
|
||||
import org.jetlinks.core.message.HeaderKey;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* @author wangzheng
|
||||
|
|
@ -24,8 +27,14 @@ public interface PropertyConstants {
|
|||
return Optional.ofNullable((T) map.get(key.getKey()));
|
||||
}
|
||||
|
||||
@Generated
|
||||
interface Key<V> extends ConfigKey<V>, HeaderKey<V> {
|
||||
|
||||
@Override
|
||||
default Type getValueType() {
|
||||
return ConfigKey.super.getValueType();
|
||||
}
|
||||
|
||||
@Override
|
||||
default Class<V> getType() {
|
||||
return ConfigKey.super.getType();
|
||||
|
|
@ -45,5 +54,52 @@ public interface PropertyConstants {
|
|||
};
|
||||
}
|
||||
|
||||
static <T> Key<T> of(String key, T defaultValue) {
|
||||
return new Key<T>() {
|
||||
@Override
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getDefaultValue() {
|
||||
return defaultValue;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static <T> Key<T> of(String key, Supplier<T> defaultValue) {
|
||||
return new Key<T>() {
|
||||
@Override
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getDefaultValue() {
|
||||
return defaultValue.get();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static <T> Key<T> of(String key, Supplier<T> defaultValue, Type type) {
|
||||
return new Key<T>() {
|
||||
@Override
|
||||
public Type getValueType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getDefaultValue() {
|
||||
return defaultValue.get();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -98,9 +98,9 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|||
public void shutdown() {
|
||||
super.shutdown();
|
||||
Flux.fromIterable(localSessions.values())
|
||||
.flatMap(Function.identity())
|
||||
.filter(session -> session.isWrapFrom(PersistentSession.class))
|
||||
.map(session -> session.unwrap(PersistentSession.class))
|
||||
.filter(ref -> ref.loaded != null)
|
||||
.filter(ref -> ref.loaded.isWrapFrom(PersistentSession.class))
|
||||
.map(ref -> ref.loaded.unwrap(PersistentSession.class))
|
||||
.as(this::tryPersistent)
|
||||
.block();
|
||||
repository.store.compactMoveChunks();
|
||||
|
|
@ -138,11 +138,14 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|||
}
|
||||
|
||||
Mono<Void> resumeSession(PersistentSessionEntity entity) {
|
||||
return entity
|
||||
return entity
|
||||
.toSession(registry.get())
|
||||
.doOnNext(session -> {
|
||||
log.debug("resume session[{}]", session.getDeviceId());
|
||||
localSessions.putIfAbsent(session.getDeviceId(), Mono.just(session));
|
||||
localSessions.putIfAbsent(session.getDeviceId(),
|
||||
new DeviceSessionRef(session.getDeviceId(),
|
||||
this,
|
||||
session));
|
||||
})
|
||||
.onErrorResume((err) -> {
|
||||
log.debug("resume session[{}] error", entity.getDeviceId(), err);
|
||||
|
|
|
|||
|
|
@ -4,18 +4,20 @@ import io.netty.buffer.ByteBuf;
|
|||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.scalecube.services.annotations.Service;
|
||||
import io.scalecube.services.annotations.ServiceMethod;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
||||
import org.hswebframework.web.crud.events.EntityDeletedEvent;
|
||||
import org.hswebframework.web.exception.BusinessException;
|
||||
import org.hswebframework.web.exception.NotFoundException;
|
||||
import org.hswebframework.web.id.IDGenerator;
|
||||
import org.jetlinks.core.rpc.RpcManager;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.core.io.FileSystemResource;
|
||||
import org.springframework.core.io.buffer.*;
|
||||
import org.springframework.http.codec.multipart.FilePart;
|
||||
|
|
@ -28,12 +30,13 @@ import java.nio.file.Path;
|
|||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.security.MessageDigest;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
||||
@Slf4j
|
||||
public class ClusterFileManager implements FileManager {
|
||||
|
||||
private final FileProperties properties;
|
||||
|
|
@ -55,8 +58,8 @@ public class ClusterFileManager implements FileManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<FileInfo> saveFile(FilePart filePart) {
|
||||
return saveFile(filePart.filename(), filePart.content());
|
||||
public Mono<FileInfo> saveFile(FilePart filePart, FileOption... options) {
|
||||
return saveFile(filePart.filename(), filePart.content(), options);
|
||||
}
|
||||
|
||||
private DataBuffer updateDigest(MessageDigest digest, DataBuffer dataBuffer) {
|
||||
|
|
@ -66,7 +69,7 @@ public class ClusterFileManager implements FileManager {
|
|||
return dataBuffer;
|
||||
}
|
||||
|
||||
public Mono<FileInfo> doSaveFile(String name, Flux<DataBuffer> stream) {
|
||||
public Mono<FileInfo> doSaveFile(String name, Flux<DataBuffer> stream, FileOption... options) {
|
||||
LocalDate now = LocalDate.now();
|
||||
FileInfo fileInfo = new FileInfo();
|
||||
fileInfo.setId(IDGenerator.MD5.generate());
|
||||
|
|
@ -85,18 +88,20 @@ public class ClusterFileManager implements FileManager {
|
|||
.map(buffer -> updateDigest(md5, updateDigest(sha256, buffer)))
|
||||
.as(buf -> DataBufferUtils
|
||||
.write(buf, path,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.CREATE_NEW,
|
||||
StandardOpenOption.TRUNCATE_EXISTING))
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.CREATE_NEW,
|
||||
StandardOpenOption.TRUNCATE_EXISTING))
|
||||
.then(Mono.defer(() -> {
|
||||
File savedFile = Paths.get(storageBasePath, storagePath).toFile();
|
||||
if (!savedFile.exists()) {
|
||||
return Mono.error(new BusinessException("error.file_storage_failed"));
|
||||
}
|
||||
fileInfo.withAccessKey(IDGenerator.MD5.generate());
|
||||
fileInfo.setMd5(ByteBufUtil.hexDump(md5.digest()));
|
||||
fileInfo.setSha256(ByteBufUtil.hexDump(sha256.digest()));
|
||||
fileInfo.setLength(savedFile.length());
|
||||
fileInfo.setCreateTime(System.currentTimeMillis());
|
||||
fileInfo.setOptions(options);
|
||||
FileEntity entity = FileEntity.of(fileInfo, storagePath, serverNodeId);
|
||||
return repository
|
||||
.insert(entity)
|
||||
|
|
@ -105,8 +110,26 @@ public class ClusterFileManager implements FileManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream) {
|
||||
return doSaveFile(name, stream);
|
||||
public Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream, FileOption... options) {
|
||||
return doSaveFile(name, stream, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<FileInfo> getFileByMd5(String md5) {
|
||||
return repository
|
||||
.createQuery()
|
||||
.where(FileEntity::getMd5, md5)
|
||||
.fetchOne()
|
||||
.map(FileEntity::toInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<FileInfo> getFileBySha256(String sha256) {
|
||||
return repository
|
||||
.createQuery()
|
||||
.where(FileEntity::getSha256, sha256)
|
||||
.fetchOne()
|
||||
.map(FileEntity::toInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -119,9 +142,9 @@ public class ClusterFileManager implements FileManager {
|
|||
private Flux<DataBuffer> readFile(String filePath, long position) {
|
||||
return DataBufferUtils
|
||||
.read(new FileSystemResource(Paths.get(properties.getStorageBasePath(), filePath)),
|
||||
position,
|
||||
bufferFactory,
|
||||
(int) properties.getReadBufferSize().toBytes())
|
||||
position,
|
||||
bufferFactory,
|
||||
(int) properties.getReadBufferSize().toBytes())
|
||||
.onErrorMap(NoSuchFileException.class, e -> new NotFoundException());
|
||||
}
|
||||
|
||||
|
|
@ -168,6 +191,27 @@ public class ClusterFileManager implements FileManager {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Integer> delete(String id) {
|
||||
return doDelete(id);
|
||||
}
|
||||
|
||||
public Mono<Integer> doDelete(String id) {
|
||||
return repository
|
||||
.deleteById(id);
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void handleDeleteEvent(EntityDeletedEvent<FileEntity> event) {
|
||||
for (FileEntity fileEntity : event.getEntity()) {
|
||||
File file = Paths.get(properties.getStorageBasePath(), fileEntity.getStoragePath()).toFile();
|
||||
if (file.exists()) {
|
||||
log.debug("delete file: {}", file.getAbsolutePath());
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@AllArgsConstructor
|
||||
private static class DefaultReaderContext implements ReaderContext {
|
||||
private final FileInfo info;
|
||||
|
|
|
|||
|
|
@ -6,10 +6,16 @@ import org.apache.commons.io.FilenameUtils;
|
|||
import org.springframework.http.MediaType;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
public class FileInfo {
|
||||
|
||||
public static final String OTHER_ACCESS_KEY = "accessKey";
|
||||
|
||||
private String id;
|
||||
|
||||
private String name;
|
||||
|
|
@ -28,6 +34,10 @@ public class FileInfo {
|
|||
|
||||
private FileOption[] options;
|
||||
|
||||
private Map<String, Object> others;
|
||||
|
||||
private String accessUrl;
|
||||
|
||||
public MediaType mediaType() {
|
||||
if (!StringUtils.hasText(extension)) {
|
||||
return MediaType.APPLICATION_OCTET_STREAM;
|
||||
|
|
@ -36,6 +46,14 @@ public class FileInfo {
|
|||
case "jpg":
|
||||
case "jpeg":
|
||||
return MediaType.IMAGE_JPEG;
|
||||
case "png":
|
||||
return MediaType.IMAGE_PNG;
|
||||
case "gif":
|
||||
return MediaType.IMAGE_GIF;
|
||||
case "mp4":
|
||||
return MediaType.parseMediaType("video/mp4");
|
||||
case "flv":
|
||||
return MediaType.parseMediaType("video/x-flv");
|
||||
case "text":
|
||||
case "txt":
|
||||
return MediaType.TEXT_PLAIN;
|
||||
|
|
@ -46,10 +64,44 @@ public class FileInfo {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean hasOption(FileOption option){
|
||||
if(options==null){
|
||||
return false;
|
||||
}
|
||||
for (FileOption fileOption : options) {
|
||||
if(fileOption==option){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
public FileInfo withFileName(String fileName) {
|
||||
name = fileName;
|
||||
extension = FilenameUtils.getExtension(fileName);
|
||||
return this;
|
||||
}
|
||||
|
||||
public synchronized FileInfo withOther(String key, Object value) {
|
||||
if (others == null) {
|
||||
others = new HashMap<>();
|
||||
}
|
||||
others.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public FileInfo withAccessKey(String accessKey) {
|
||||
withOther(OTHER_ACCESS_KEY, accessKey);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Optional<String> accessKey() {
|
||||
return Optional
|
||||
.ofNullable(others)
|
||||
.map(map -> map.get(OTHER_ACCESS_KEY))
|
||||
.map(String::valueOf)
|
||||
.filter(StringUtils::hasText);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,23 +7,31 @@ import reactor.core.publisher.Mono;
|
|||
|
||||
import java.util.function.Function;
|
||||
|
||||
|
||||
/**
|
||||
* 文件管理器,统一管理文件信息
|
||||
*/
|
||||
public interface FileManager {
|
||||
|
||||
Mono<FileInfo> saveFile(FilePart filePart);
|
||||
Mono<FileInfo> saveFile(FilePart filePart, FileOption... options);
|
||||
|
||||
Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream);
|
||||
Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream, FileOption... options);
|
||||
|
||||
Mono<FileInfo> getFile(String id);
|
||||
|
||||
Mono<FileInfo> getFileByMd5(String md5);
|
||||
|
||||
Mono<FileInfo> getFileBySha256(String sha256);
|
||||
|
||||
Flux<DataBuffer> read(String id);
|
||||
|
||||
Flux<DataBuffer> read(String id, long position);
|
||||
|
||||
Flux<DataBuffer> read(String id,
|
||||
Function<ReaderContext,Mono<Void>> beforeRead);
|
||||
Function<ReaderContext, Mono<Void>> beforeRead);
|
||||
|
||||
interface ReaderContext{
|
||||
Mono<Integer> delete(String id);
|
||||
|
||||
interface ReaderContext {
|
||||
FileInfo info();
|
||||
|
||||
void position(long position);
|
||||
|
|
|
|||
|
|
@ -1,14 +1,21 @@
|
|||
package org.jetlinks.community.io.file.web;
|
||||
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.hswebframework.web.authorization.Authentication;
|
||||
import org.hswebframework.web.authorization.annotation.Authorize;
|
||||
import org.hswebframework.web.authorization.annotation.DeleteAction;
|
||||
import org.hswebframework.web.authorization.annotation.Resource;
|
||||
import org.hswebframework.web.authorization.exception.AccessDenyException;
|
||||
import org.jetlinks.community.io.file.FileInfo;
|
||||
import org.jetlinks.community.io.file.FileManager;
|
||||
import org.jetlinks.community.io.file.FileOption;
|
||||
import org.springframework.http.ContentDisposition;
|
||||
import org.springframework.http.HttpRange;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.multipart.FilePart;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
|
@ -19,6 +26,8 @@ import java.util.List;
|
|||
@RestController
|
||||
@RequestMapping("/file")
|
||||
@AllArgsConstructor
|
||||
@Resource(id= "file-manager",name = "文件管理")
|
||||
@Tag(name = "需身份认证的文件管理")
|
||||
public class FileManagerController {
|
||||
|
||||
private final FileManager fileManager;
|
||||
|
|
@ -31,7 +40,7 @@ public class FileManagerController {
|
|||
}
|
||||
|
||||
@GetMapping("/{fileId}")
|
||||
@Authorize(merge = false)
|
||||
@Authorize(ignore = true)
|
||||
@Operation(summary = "获取文件")
|
||||
public Mono<Void> read(@PathVariable String fileId,
|
||||
ServerWebExchange exchange) {
|
||||
|
|
@ -40,28 +49,67 @@ public class FileManagerController {
|
|||
.getResponse()
|
||||
.writeWith(fileManager
|
||||
.read(fileId, ctx -> {
|
||||
List<HttpRange> ranges = exchange
|
||||
.getRequest()
|
||||
.getHeaders()
|
||||
.getRange();
|
||||
long position = 0;
|
||||
if (ranges.size() != 0) {
|
||||
position = ranges.get(0).getRangeStart(ctx.info().getLength());
|
||||
Mono<Void> before;
|
||||
//不是公开访问则需要登陆或者使用accessKey
|
||||
if (!ctx.info().hasOption(FileOption.publicAccess)) {
|
||||
String key = exchange.getRequest().getQueryParams().getFirst("accessKey");
|
||||
//请求参数没有accessKey则校验当前用户是否登陆
|
||||
if (!StringUtils.hasText(key)) {
|
||||
before = Authentication
|
||||
.currentReactive()
|
||||
.switchIfEmpty(Mono.error(AccessDenyException::new))
|
||||
.then();
|
||||
} else {
|
||||
//校验accessKey
|
||||
if (ctx.info().accessKey().map(key::equalsIgnoreCase).orElse(false)) {
|
||||
before = Mono.empty();
|
||||
} else {
|
||||
before = Mono.error(AccessDenyException::new);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
before = Mono.empty();
|
||||
}
|
||||
ctx.position(position);
|
||||
MediaType mediaType = ctx.info().mediaType();
|
||||
exchange.getResponse().getHeaders().setContentType(mediaType);
|
||||
exchange.getResponse().getHeaders().setContentLength(ctx.info().getLength());
|
||||
//文件流时下载文件
|
||||
if (mediaType.includes(MediaType.APPLICATION_OCTET_STREAM)) {
|
||||
exchange.getResponse().getHeaders().setContentDisposition(
|
||||
ContentDisposition
|
||||
.builder("attachment")
|
||||
.filename(ctx.info().getName(), StandardCharsets.UTF_8)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
return Mono.empty();
|
||||
|
||||
return before.then(
|
||||
Mono.fromRunnable(() -> {
|
||||
List<HttpRange> ranges = exchange
|
||||
.getRequest()
|
||||
.getHeaders()
|
||||
.getRange();
|
||||
long position = 0;
|
||||
if (ranges.size() != 0) {
|
||||
position = ranges.get(0).getRangeStart(ctx.info().getLength());
|
||||
}
|
||||
ctx.position(position);
|
||||
MediaType mediaType = ctx.info().mediaType();
|
||||
exchange.getResponse().getHeaders().setContentType(mediaType);
|
||||
exchange.getResponse().getHeaders().setContentLength(ctx.info().getLength());
|
||||
exchange.getResponse().getHeaders().add("file-md5", ctx.info().getMd5());
|
||||
exchange.getResponse().getHeaders().add("file-sha256", ctx.info().getSha256());
|
||||
|
||||
//文件流时下载文件
|
||||
if (mediaType.includes(MediaType.APPLICATION_OCTET_STREAM)) {
|
||||
exchange.getResponse().getHeaders().setContentDisposition(
|
||||
ContentDisposition
|
||||
.builder("attachment")
|
||||
.filename(ctx.info().getName(), StandardCharsets.UTF_8)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
|
||||
}));
|
||||
}
|
||||
|
||||
@DeleteMapping("/{fileId}")
|
||||
@DeleteAction
|
||||
@Operation(summary = "删除文件")
|
||||
public Mono<Integer> delete(@PathVariable String fileId) {
|
||||
return fileManager
|
||||
.delete(fileId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
|
|||
import org.jetlinks.community.network.NetworkType;
|
||||
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
|
||||
import org.jetlinks.community.network.mqtt.server.MqttConnection;
|
||||
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
|
||||
import org.jetlinks.community.network.mqtt.server.MqttServer;
|
||||
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
|
||||
import org.jetlinks.community.utils.SystemUtils;
|
||||
|
|
@ -112,27 +113,25 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||
monitor.rejected();
|
||||
}
|
||||
return isStarted();
|
||||
return true;
|
||||
})
|
||||
.publishOn(Schedulers.parallel())
|
||||
//处理mqtt连接请求
|
||||
.flatMap(this::handleConnection)
|
||||
//处理认证结果
|
||||
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
||||
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
|
||||
.contextWrite(ReactiveLogger.start("network", mqttServer.getId()))
|
||||
.flatMap(connection -> this
|
||||
.handleConnection(connection)
|
||||
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
||||
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()))
|
||||
.onErrorResume(err -> {
|
||||
log.error(err.getMessage(), err);
|
||||
return Mono.empty();
|
||||
}),
|
||||
Integer.MAX_VALUE)
|
||||
.subscribe();
|
||||
|
||||
}
|
||||
|
||||
//处理连接,并进行认证
|
||||
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
||||
//内存不够了
|
||||
if (SystemUtils.memoryIsOutOfWatermark()) {
|
||||
//直接拒绝,响应SERVER_UNAVAILABLE,不再处理此连接
|
||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
return Mono
|
||||
.justOrEmpty(connection.getAuth())
|
||||
.flatMap(auth -> {
|
||||
|
|
@ -170,7 +169,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
//应答SERVER_UNAVAILABLE
|
||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||
}))
|
||||
.subscribeOn(Schedulers.parallel());
|
||||
;
|
||||
}
|
||||
|
||||
//处理认证结果
|
||||
|
|
@ -190,7 +189,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
monitor.totalConnection(counter.sum());
|
||||
|
||||
sessionManager
|
||||
.getSession(deviceId)
|
||||
.getSession(deviceId, false)
|
||||
.flatMap(_tmp -> {
|
||||
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
|
||||
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
|
||||
|
|
@ -219,19 +218,21 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
})
|
||||
.defaultIfEmpty(newSession);
|
||||
})
|
||||
.flatMap(session -> Mono.fromCallable(() -> {
|
||||
.mapNotNull(session->{
|
||||
try {
|
||||
return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
|
||||
} catch (IllegalStateException ignore) {
|
||||
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
|
||||
return null;
|
||||
}
|
||||
}))
|
||||
})
|
||||
.doOnNext(o -> {
|
||||
//监控信息
|
||||
monitor.connected();
|
||||
monitor.totalConnection(counter.sum());
|
||||
});
|
||||
})
|
||||
//会话empty说明注册会话失败?
|
||||
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
|
||||
} else {
|
||||
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
|
||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
|
||||
|
|
@ -253,20 +254,16 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
|
||||
DeviceOperator operator,
|
||||
MqttConnectionSession session) {
|
||||
|
||||
|
||||
return Flux
|
||||
.usingWhen(Mono.just(connection),
|
||||
MqttConnection::handleMessage,
|
||||
MqttConnection::close)
|
||||
//网关暂停或者已停止时,则不处理消息
|
||||
.filter(pb -> isStarted())
|
||||
.doOnNext(msg -> monitor.receivedMessage())
|
||||
.publishOn(Schedulers.parallel())
|
||||
//解码收到的mqtt报文
|
||||
.flatMap(publishing -> this
|
||||
.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
|
||||
//应答MQTT(QoS1,2的场景)
|
||||
.doOnSuccess(s -> publishing.acknowledge())
|
||||
.concatMap(publishing -> this
|
||||
.decodeAndHandleMessage(operator, session, publishing, connection)
|
||||
)
|
||||
//合并遗言消息
|
||||
.mergeWith(
|
||||
|
|
@ -282,30 +279,32 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
MqttConnectionSession session,
|
||||
MqttMessage message,
|
||||
MqttConnection connection) {
|
||||
monitor.receivedMessage();
|
||||
|
||||
return operator
|
||||
.getProtocol()
|
||||
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
|
||||
//解码
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message, registry)))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(msg -> {
|
||||
.concatMap(msg -> {
|
||||
//回填deviceId,有的场景协议包不能或者没有解析出deviceId,则直接使用连接对应的设备id进行填充.
|
||||
if (!StringUtils.hasText(msg.getDeviceId())) {
|
||||
msg.thingId(DeviceThingType.device, operator.getDeviceId());
|
||||
}
|
||||
return this
|
||||
.handleMessage(operator, msg, connection);
|
||||
return this.handleMessage(operator, msg, connection);
|
||||
})
|
||||
.doOnComplete(() -> {
|
||||
if (message instanceof MqttPublishing) {
|
||||
((MqttPublishing) message).acknowledge();
|
||||
}
|
||||
})
|
||||
.doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
|
||||
.as(FluxTracer
|
||||
.create(DeviceTracer.SpanName.decode(operator.getDeviceId()),
|
||||
(span, msg) -> span.setAttribute(DeviceTracer.SpanKey.message, msg
|
||||
.toJson()
|
||||
.toJSONString())))
|
||||
//发生错误不中断流
|
||||
.onErrorResume((err) -> Mono.empty())
|
||||
.then()
|
||||
.subscribeOn(Schedulers.parallel());
|
||||
.onErrorResume((err) -> {
|
||||
log.error("handle mqtt message [{}] error:{}", operator.getDeviceId(), message, err);
|
||||
return Mono.empty();
|
||||
})
|
||||
.then();
|
||||
}
|
||||
|
||||
private Mono<DeviceMessage> handleMessage(DeviceOperator mainDevice,
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import org.jetlinks.core.message.codec.EncodedMessage;
|
|||
import org.jetlinks.core.message.codec.MqttMessage;
|
||||
import org.jetlinks.core.message.codec.SimpleMqttMessage;
|
||||
import org.jetlinks.core.server.mqtt.MqttAuth;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import reactor.core.publisher.*;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
|
@ -41,7 +42,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
private long keepAliveTimeoutMs;
|
||||
@Getter
|
||||
private long lastPingTime = System.currentTimeMillis();
|
||||
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
|
||||
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false;
|
||||
private static final MqttAuth emptyAuth = new MqttAuth() {
|
||||
@Override
|
||||
public String getUsername() {
|
||||
|
|
@ -53,19 +54,9 @@ class VertxMqttConnection implements MqttConnection {
|
|||
return "";
|
||||
}
|
||||
};
|
||||
private final Sinks.Many<MqttPublishing> messageProcessor = Sinks
|
||||
.many()
|
||||
.multicast()
|
||||
.onBackpressureBuffer(Integer.MAX_VALUE);
|
||||
|
||||
private final Sinks.Many<MqttSubscription> subscription = Sinks
|
||||
.many()
|
||||
.multicast()
|
||||
.onBackpressureBuffer(Integer.MAX_VALUE);
|
||||
private final Sinks.Many<MqttUnSubscription> unsubscription = Sinks
|
||||
.many()
|
||||
.multicast()
|
||||
.onBackpressureBuffer(Integer.MAX_VALUE);
|
||||
private final Sinks.Many<MqttPublishing> messageProcessor = Reactors.createMany(Integer.MAX_VALUE, false);
|
||||
private final Sinks.Many<MqttSubscription> subscription = Reactors.createMany(Integer.MAX_VALUE, false);
|
||||
private final Sinks.Many<MqttUnSubscription> unsubscription = Reactors.createMany(Integer.MAX_VALUE, false);
|
||||
|
||||
|
||||
public VertxMqttConnection(MqttEndpoint endpoint) {
|
||||
|
|
@ -178,7 +169,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
publishing.acknowledge();
|
||||
}
|
||||
if (hasDownstream) {
|
||||
this.messageProcessor.tryEmitNext(publishing);
|
||||
this.messageProcessor.emitNext(publishing, Reactors.emitFailureHandler());
|
||||
}
|
||||
})
|
||||
//QoS 1 PUBACK
|
||||
|
|
@ -211,7 +202,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
subscription.acknowledge();
|
||||
}
|
||||
if (hasDownstream) {
|
||||
this.subscription.tryEmitNext(subscription);
|
||||
this.subscription.emitNext(subscription, Reactors.emitFailureHandler());
|
||||
}
|
||||
})
|
||||
.unsubscribeHandler(msg -> {
|
||||
|
|
@ -222,7 +213,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
unSubscription.acknowledge();
|
||||
}
|
||||
if (hasDownstream) {
|
||||
this.unsubscription.tryEmitNext(unSubscription);
|
||||
this.unsubscription.emitNext(unSubscription, Reactors.emitFailureHandler());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import reactor.core.publisher.Mono;
|
|||
import reactor.util.context.Context;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
|
@ -121,22 +122,27 @@ public class DeviceGatewayHelper {
|
|||
.createOrUpdateSession(childrenId,
|
||||
children,
|
||||
child -> Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child)),
|
||||
Mono::empty));
|
||||
|
||||
Mono::empty)
|
||||
.doOnNext(session -> {
|
||||
if (session.isWrapFrom(ChildrenDeviceSession.class)) {
|
||||
ChildrenDeviceSession childrenSession = session.unwrap(ChildrenDeviceSession.class);
|
||||
//网关发生变化,替换新的上级会话
|
||||
if (!Objects.equals(deviceId, childrenSession.getParent().getDeviceId())) {
|
||||
childrenSession.replaceWith(parentSession);
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
//子设备注册
|
||||
if (isDoRegister(children)) {
|
||||
return Mono
|
||||
//延迟2秒,因为自动注册是异步的,收到消息后并不能保证马上可以注册成功.
|
||||
.delay(Duration.ofSeconds(2))
|
||||
.then(registry
|
||||
.getDevice(children.getDeviceId())
|
||||
.flatMap(device -> device
|
||||
//没有配置状态自管理才自动上线
|
||||
.getSelfConfig(DeviceConfigKey.selfManageState)
|
||||
.defaultIfEmpty(false)
|
||||
.filter(Boolean.FALSE::equals))
|
||||
.flatMap(ignore -> sessionHandler))
|
||||
return this
|
||||
.getDeviceForRegister(children.getDeviceId())
|
||||
.flatMap(device -> device
|
||||
//没有配置状态自管理才自动上线
|
||||
.getSelfConfig(DeviceConfigKey.selfManageState)
|
||||
.defaultIfEmpty(false)
|
||||
.filter(Boolean.FALSE::equals))
|
||||
.flatMap(ignore -> sessionHandler)
|
||||
.then();
|
||||
}
|
||||
return sessionHandler.then();
|
||||
|
|
@ -207,7 +213,7 @@ public class DeviceGatewayHelper {
|
|||
}
|
||||
|
||||
if (doHandle) {
|
||||
then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt));
|
||||
then = messageHandler.handleMessage(null, message).then(then);
|
||||
}
|
||||
|
||||
return this
|
||||
|
|
@ -260,6 +266,15 @@ public class DeviceGatewayHelper {
|
|||
.flatMap(Function.identity());
|
||||
}
|
||||
|
||||
private Mono<DeviceOperator> getDeviceForRegister(String deviceId) {
|
||||
return registry
|
||||
.getDevice(deviceId)
|
||||
.switchIfEmpty(Mono.defer(() -> Mono
|
||||
//延迟2秒,因为自动注册是异步的,收到消息后并不能保证马上可以注册成功.
|
||||
.delay(Duration.ofSeconds(2))
|
||||
.then(registry.getDevice(deviceId))));
|
||||
}
|
||||
|
||||
private Mono<DeviceSession> createNewSession(String deviceId,
|
||||
DeviceMessage message,
|
||||
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
|
||||
|
|
@ -298,7 +313,7 @@ public class DeviceGatewayHelper {
|
|||
private Mono<DeviceSession> updateSession0(DeviceSession session,
|
||||
DeviceMessage message,
|
||||
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) {
|
||||
Mono<Void> after = null;
|
||||
Mono<DeviceSession> after = null;
|
||||
//消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话
|
||||
if (isNewKeeOnline(session, message)) {
|
||||
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
|
||||
|
|
@ -310,14 +325,13 @@ public class DeviceGatewayHelper {
|
|||
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
|
||||
after = sessionBuilder
|
||||
.apply(session.getOperator())
|
||||
.map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)))
|
||||
.then();
|
||||
.map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)));
|
||||
}
|
||||
applySessionKeepaliveTimeout(message, session);
|
||||
session.keepAlive();
|
||||
return after == null
|
||||
? Mono.just(session)
|
||||
: after.thenReturn(session);
|
||||
: after;
|
||||
}
|
||||
|
||||
private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) {
|
||||
|
|
@ -348,7 +362,11 @@ public class DeviceGatewayHelper {
|
|||
|
||||
//判断保持在线的会话是否以及丢失(服务重启后可能出现)
|
||||
private static boolean isKeeOnlineLost(DeviceSession session) {
|
||||
return session instanceof KeepOnlineSession && session.isWrapFrom(LostDeviceSession.class);
|
||||
if (!session.isWrapFrom(KeepOnlineSession.class)) {
|
||||
return false;
|
||||
}
|
||||
return session.isWrapFrom(LostDeviceSession.class)
|
||||
|| !session.unwrap(KeepOnlineSession.class).getParent().isAlive();
|
||||
}
|
||||
|
||||
//判断是否为设备注册
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
package org.jetlinks.community.network.tcp.client;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetClient;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
|
|
@ -8,15 +10,15 @@ import lombok.Getter;
|
|||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.jetlinks.community.network.DefaultNetworkType;
|
||||
import org.jetlinks.community.network.NetworkType;
|
||||
import org.jetlinks.community.network.tcp.TcpMessage;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParser;
|
||||
import org.jetlinks.core.message.codec.EncodedMessage;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import org.jetlinks.community.network.DefaultNetworkType;
|
||||
import org.jetlinks.community.network.NetworkType;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Sinks;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketException;
|
||||
|
|
@ -24,28 +26,29 @@ import java.time.Duration;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Slf4j
|
||||
public class VertxTcpClient implements TcpClient {
|
||||
|
||||
public volatile NetClient client;
|
||||
|
||||
public NetSocket socket;
|
||||
|
||||
volatile PayloadParser payloadParser;
|
||||
|
||||
@Getter
|
||||
private final String id;
|
||||
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
|
||||
private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
|
||||
private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
||||
private final boolean serverClient;
|
||||
public volatile NetClient client;
|
||||
public NetSocket socket;
|
||||
volatile PayloadParser payloadParser;
|
||||
|
||||
@Setter
|
||||
private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
|
||||
|
||||
private volatile long lastKeepAliveTime = System.currentTimeMillis();
|
||||
|
||||
public VertxTcpClient(String id, boolean serverClient) {
|
||||
this.id = id;
|
||||
this.serverClient = serverClient;
|
||||
}
|
||||
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final Sinks.Many<TcpMessage> sink = Reactors.createMany();
|
||||
|
||||
private final boolean serverClient;
|
||||
|
||||
@Override
|
||||
public void keepAlive() {
|
||||
|
|
@ -72,15 +75,18 @@ public class VertxTcpClient implements TcpClient {
|
|||
@Override
|
||||
public Mono<Void> sendMessage(EncodedMessage message) {
|
||||
return Mono
|
||||
.create((sink) -> {
|
||||
.<Void>create((sink) -> {
|
||||
if (socket == null) {
|
||||
sink.error(new SocketException("socket closed"));
|
||||
return;
|
||||
}
|
||||
Buffer buffer = Buffer.buffer(message.getPayload());
|
||||
ByteBuf buf = message.getPayload();
|
||||
Buffer buffer = Buffer.buffer(buf);
|
||||
int len = buffer.length();
|
||||
socket.write(buffer, r -> {
|
||||
keepAlive();
|
||||
ReferenceCountUtil.safeRelease(buf);
|
||||
if (r.succeeded()) {
|
||||
keepAlive();
|
||||
sink.success();
|
||||
} else {
|
||||
sink.error(r.cause());
|
||||
|
|
@ -111,23 +117,18 @@ public class VertxTcpClient implements TcpClient {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 接收TCP消息
|
||||
*
|
||||
* @param message TCP消息
|
||||
*/
|
||||
public VertxTcpClient(String id,boolean serverClient) {
|
||||
this.id = id;
|
||||
this.serverClient = serverClient;
|
||||
}
|
||||
|
||||
protected void received(TcpMessage message) {
|
||||
if (processor.getPending() > processor.getBufferSize() / 2) {
|
||||
log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString());
|
||||
return;
|
||||
}
|
||||
sink.next(message);
|
||||
sink.emitNext(message,Reactors.RETRY_NON_SERIALIZED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<TcpMessage> subscribe() {
|
||||
return processor
|
||||
.map(Function.identity());
|
||||
return sink.asFlux();
|
||||
}
|
||||
|
||||
private void execute(Runnable runnable) {
|
||||
|
|
@ -144,7 +145,7 @@ public class VertxTcpClient implements TcpClient {
|
|||
return null;
|
||||
}
|
||||
SocketAddress socketAddress = socket.remoteAddress();
|
||||
return new InetSocketAddress(socketAddress.host(), socketAddress.port());
|
||||
return InetSocketAddress.createUnresolved(socketAddress.host(), socketAddress.port());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -154,6 +155,9 @@ public class VertxTcpClient implements TcpClient {
|
|||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (socket == null) {
|
||||
return;
|
||||
}
|
||||
log.debug("tcp client [{}] disconnect", getId());
|
||||
synchronized (this) {
|
||||
if (null != client) {
|
||||
|
|
@ -174,7 +178,7 @@ public class VertxTcpClient implements TcpClient {
|
|||
}
|
||||
disconnectListener.clear();
|
||||
if (serverClient) {
|
||||
processor.onComplete();
|
||||
sink.tryEmitComplete();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -186,11 +190,6 @@ public class VertxTcpClient implements TcpClient {
|
|||
this.client = client;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置客户端消息解析器
|
||||
*
|
||||
* @param payloadParser 消息解析器
|
||||
*/
|
||||
public void setRecordParser(PayloadParser payloadParser) {
|
||||
synchronized (this) {
|
||||
if (null != this.payloadParser && this.payloadParser != payloadParser) {
|
||||
|
|
@ -199,18 +198,10 @@ public class VertxTcpClient implements TcpClient {
|
|||
this.payloadParser = payloadParser;
|
||||
this.payloadParser
|
||||
.handlePayload()
|
||||
.onErrorContinue((err, res) -> {
|
||||
log.error(err.getMessage(), err);
|
||||
})
|
||||
.subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* socket处理
|
||||
*
|
||||
* @param socket socket
|
||||
*/
|
||||
public void setSocket(NetSocket socket) {
|
||||
synchronized (this) {
|
||||
Objects.requireNonNull(payloadParser);
|
||||
|
|
@ -222,12 +213,12 @@ public class VertxTcpClient implements TcpClient {
|
|||
.handler(buffer -> {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("handle tcp client[{}] payload:[{}]",
|
||||
socket.remoteAddress(),
|
||||
Hex.encodeHexString(buffer.getBytes()));
|
||||
socket.remoteAddress(),
|
||||
Hex.encodeHexString(buffer.getBytes()));
|
||||
}
|
||||
keepAlive();
|
||||
payloadParser.handle(buffer);
|
||||
if (this.socket != socket) {
|
||||
if (this.socket != null && this.socket != socket) {
|
||||
log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
|
||||
socket.close();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
|
|||
netClient.connect(properties.getPort(), properties.getHost(), result -> {
|
||||
if (result.succeeded()) {
|
||||
log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
|
||||
client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties));
|
||||
client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties).get());
|
||||
client.setSocket(result.result());
|
||||
} else {
|
||||
log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause());
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package org.jetlinks.community.network.tcp.device;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.hswebframework.web.logger.ReactiveLogger;
|
||||
import org.jetlinks.community.gateway.AbstractDeviceGateway;
|
||||
|
|
@ -10,6 +11,7 @@ import org.jetlinks.community.network.tcp.TcpMessage;
|
|||
import org.jetlinks.community.network.tcp.client.TcpClient;
|
||||
import org.jetlinks.community.network.tcp.server.TcpServer;
|
||||
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
|
||||
import org.jetlinks.community.utils.TimeUtils;
|
||||
import org.jetlinks.core.ProtocolSupport;
|
||||
import org.jetlinks.core.ProtocolSupports;
|
||||
import org.jetlinks.core.device.DeviceOperator;
|
||||
|
|
@ -52,6 +54,12 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
|
|||
|
||||
private Disposable disposable;
|
||||
|
||||
//连接检查超时时间,超过时间连接没有被正确处理返回会话,将被自动断开连接
|
||||
@Setter
|
||||
private Duration connectCheckTimeout = TimeUtils.parse(
|
||||
System.getProperty("gateway.tcp.network.connect-check-timeout", "10s"));
|
||||
|
||||
|
||||
private final DeviceGatewayHelper helper;
|
||||
|
||||
public TcpServerDeviceGateway(String id,
|
||||
|
|
@ -95,6 +103,7 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
|
|||
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
|
||||
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
|
||||
final InetSocketAddress address;
|
||||
Disposable legalityChecker;
|
||||
|
||||
TcpConnection(TcpClient client) {
|
||||
this.client = client;
|
||||
|
|
@ -111,6 +120,20 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
|
|||
});
|
||||
monitor.connected();
|
||||
sessionRef.set(new UnknownTcpDeviceSession(client.getId(), client, getTransport()));
|
||||
legalityChecker = Schedulers
|
||||
.parallel()
|
||||
.schedule(this::checkLegality, connectCheckTimeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void checkLegality() {
|
||||
//超过时间还未获取到任何设备则认为连接不合法,自动断开连接
|
||||
if ((sessionRef.get() instanceof UnknownTcpDeviceSession)) {
|
||||
log.info("tcp [{}] connection is illegal, close it.", address);
|
||||
try {
|
||||
client.disconnect();
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Mono<Void> accept() {
|
||||
|
|
@ -157,16 +180,22 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSup
|
|||
}
|
||||
|
||||
Mono<DeviceMessage> handleDeviceMessage(DeviceMessage message) {
|
||||
Disposable checker = legalityChecker;
|
||||
if (checker != null) {
|
||||
checker.dispose();
|
||||
legalityChecker = null;
|
||||
}
|
||||
monitor.receivedMessage();
|
||||
return helper
|
||||
.handleDeviceMessage(message,
|
||||
device -> new TcpDeviceSession(device, client, getTransport(), monitor),
|
||||
session -> {
|
||||
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
|
||||
deviceSession.setClient(client);
|
||||
sessionRef.set(deviceSession);
|
||||
},
|
||||
() -> log.warn("TCP{}: The device[{}] in the message body does not exist:{}", address, message.getDeviceId(), message)
|
||||
.handleDeviceMessage(
|
||||
message,
|
||||
device -> new TcpDeviceSession(device, client, getTransport(), monitor),
|
||||
session -> {
|
||||
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
|
||||
deviceSession.setClient(client);
|
||||
sessionRef.set(deviceSession);
|
||||
},
|
||||
() -> log.warn("TCP{}: The device[{}] in the message body does not exist:{}", address, message.getDeviceId(), message)
|
||||
)
|
||||
.thenReturn(message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,14 +9,16 @@ import org.springframework.beans.BeansException;
|
|||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@Component
|
||||
public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPostProcessor {
|
||||
|
||||
private Map<PayloadParserType, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
|
||||
private final Map<PayloadParserType, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
|
||||
|
||||
public DefaultPayloadParserBuilder(){
|
||||
register(new FixLengthPayloadParserBuilder());
|
||||
|
|
@ -25,9 +27,10 @@ public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPo
|
|||
register(new DirectPayloadParserBuilder());
|
||||
}
|
||||
@Override
|
||||
public PayloadParser build(PayloadParserType type, ValueObject configuration) {
|
||||
public Supplier<PayloadParser> build(PayloadParserType type, ValueObject configuration) {
|
||||
|
||||
return Optional.ofNullable(strategyMap.get(type))
|
||||
.map(builder -> builder.build(configuration))
|
||||
.map(builder -> builder.buildLazy(configuration))
|
||||
.orElseThrow(() -> new UnsupportedOperationException("unsupported parser:" + type));
|
||||
}
|
||||
|
||||
|
|
@ -36,7 +39,7 @@ public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPo
|
|||
}
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
public Object postProcessAfterInitialization(@Nonnull Object bean,@Nonnull String beanName) throws BeansException {
|
||||
if (bean instanceof PayloadParserBuilderStrategy) {
|
||||
register(((PayloadParserBuilderStrategy) bean));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,28 +1,32 @@
|
|||
package org.jetlinks.community.network.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Sinks;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 不处理直接返回数据包
|
||||
*
|
||||
* @author zhouhao
|
||||
* @since 1.0
|
||||
*/
|
||||
public class DirectRecordParser implements PayloadParser {
|
||||
|
||||
EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
|
||||
|
||||
private final Sinks.Many<Buffer> sink = Reactors.createMany();
|
||||
|
||||
@Override
|
||||
public void handle(Buffer buffer) {
|
||||
processor.onNext(buffer);
|
||||
sink.emitNext(buffer, Reactors.emitFailureHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Buffer> handlePayload() {
|
||||
return processor.map(Function.identity());
|
||||
return sink.asFlux();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
processor.onComplete();
|
||||
sink.emitComplete(Reactors.emitFailureHandler());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,9 +7,9 @@ import reactor.core.publisher.Flux;
|
|||
* 用于处理TCP粘拆包的解析器,通常一个客户端对应一个解析器.
|
||||
*
|
||||
* @author zhouhao
|
||||
* @see org.jetlinks.community.network.tcp.parser.strateies.PipePayloadParser
|
||||
* @see org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder
|
||||
* @see org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder
|
||||
* @see PipePayloadParser
|
||||
* @see FixLengthPayloadParserBuilder
|
||||
* @see DelimitedPayloadParserBuilder
|
||||
* @since 1.0
|
||||
*/
|
||||
public interface PayloadParser {
|
||||
|
|
|
|||
|
|
@ -2,8 +2,23 @@ package org.jetlinks.community.network.tcp.parser;
|
|||
|
||||
import org.jetlinks.community.ValueObject;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 解析器构造器,用于根据解析器类型和配置信息构造对应的解析器
|
||||
*
|
||||
* @author zhouhao
|
||||
* @since 1.0
|
||||
*/
|
||||
public interface PayloadParserBuilder {
|
||||
|
||||
PayloadParser build(PayloadParserType type, ValueObject configuration);
|
||||
/**
|
||||
* 构造解析器
|
||||
*
|
||||
* @param type 解析器类型
|
||||
* @param configuration 配置信息
|
||||
* @return 解析器
|
||||
*/
|
||||
Supplier<PayloadParser> build(PayloadParserType type, ValueObject configuration);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,36 @@
|
|||
package org.jetlinks.community.network.tcp.parser;
|
||||
|
||||
import org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder;
|
||||
import org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder;
|
||||
import org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 解析器构造器策略,用于实现不同类型的解析器构造逻辑
|
||||
*
|
||||
* @author zhouhao
|
||||
* @since 1.0
|
||||
* @see FixLengthPayloadParserBuilder
|
||||
* @see DelimitedPayloadParserBuilder
|
||||
* @see ScriptPayloadParserBuilder
|
||||
*/
|
||||
public interface PayloadParserBuilderStrategy {
|
||||
/**
|
||||
* @return 解析器类型
|
||||
*/
|
||||
PayloadParserType getType();
|
||||
|
||||
PayloadParser build(ValueObject config);
|
||||
/**
|
||||
* 构造解析器
|
||||
*
|
||||
* @param config 配置信息
|
||||
* @return 解析器
|
||||
*/
|
||||
Supplier<PayloadParser> buildLazy(ValueObject config);
|
||||
|
||||
default PayloadParser build(ValueObject config){
|
||||
return buildLazy(config).get();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import lombok.Getter;
|
|||
import org.hswebframework.web.dict.Dict;
|
||||
import org.hswebframework.web.dict.EnumDict;
|
||||
import org.jetlinks.community.network.tcp.parser.strateies.PipePayloadParser;
|
||||
import org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder;
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
|
|
@ -18,7 +19,7 @@ public enum PayloadParserType implements EnumDict<String> {
|
|||
DELIMITED("分隔符"),
|
||||
|
||||
/**
|
||||
* @see org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder
|
||||
* @see ScriptPayloadParserBuilder
|
||||
* @see PipePayloadParser
|
||||
*/
|
||||
SCRIPT("自定义脚本")
|
||||
|
|
|
|||
|
|
@ -1,10 +1,21 @@
|
|||
package org.jetlinks.community.network.tcp.parser.strateies;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.text.StringEscapeUtils;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 以分隔符读取数据包
|
||||
*
|
||||
* @author zhouhao
|
||||
* @since 1.0
|
||||
*/
|
||||
public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {
|
||||
@Override
|
||||
public PayloadParserType getType() {
|
||||
|
|
@ -12,12 +23,22 @@ public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RecordParser createParser(ValueObject config) {
|
||||
@SneakyThrows
|
||||
protected Supplier<RecordParser> createParser(ValueObject config) {
|
||||
|
||||
return RecordParser.newDelimited(StringEscapeUtils.unescapeJava(
|
||||
config
|
||||
.getString("delimited")
|
||||
.orElseThrow(() -> new IllegalArgumentException("delimited can not be null"))));
|
||||
String delimited = config
|
||||
.getString("delimited")
|
||||
.map(String::trim)
|
||||
.orElseThrow(() -> new IllegalArgumentException("delimited can not be null"));
|
||||
|
||||
if (delimited.startsWith("0x")) {
|
||||
|
||||
byte[] hex = Hex.decodeHex(delimited.substring(2));
|
||||
return () -> RecordParser
|
||||
.newDelimited(Buffer.buffer(hex));
|
||||
}
|
||||
|
||||
return () -> RecordParser.newDelimited(StringEscapeUtils.unescapeJava(delimited));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
package org.jetlinks.community.network.tcp.parser.strateies;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
import org.jetlinks.community.network.tcp.parser.DirectRecordParser;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParser;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class DirectPayloadParserBuilder implements PayloadParserBuilderStrategy {
|
||||
|
||||
|
|
@ -16,7 +18,7 @@ public class DirectPayloadParserBuilder implements PayloadParserBuilderStrategy
|
|||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public PayloadParser build(ValueObject config) {
|
||||
return new DirectRecordParser();
|
||||
public Supplier<PayloadParser> buildLazy(ValueObject config) {
|
||||
return DirectRecordParser::new;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,17 @@
|
|||
package org.jetlinks.community.network.tcp.parser.strateies;
|
||||
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 固定长度解析器构造器,每次读取固定长度的数据包
|
||||
*
|
||||
* @author zhouhao
|
||||
* @since 1.0
|
||||
*/
|
||||
public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {
|
||||
@Override
|
||||
public PayloadParserType getType() {
|
||||
|
|
@ -11,9 +19,11 @@ public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RecordParser createParser(ValueObject config) {
|
||||
return RecordParser.newFixed(config.getInt("size")
|
||||
.orElseThrow(() -> new IllegalArgumentException("size can not be null")));
|
||||
protected Supplier<RecordParser> createParser(ValueObject config) {
|
||||
int size = config.getInt("size")
|
||||
.orElseThrow(() -> new IllegalArgumentException("size can not be null"));
|
||||
|
||||
return () -> RecordParser.newFixed(size);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,13 +4,14 @@ import io.vertx.core.buffer.Buffer;
|
|||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParser;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Sinks;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
|
@ -28,11 +29,12 @@ import java.util.function.Function;
|
|||
@Slf4j
|
||||
public class PipePayloadParser implements PayloadParser {
|
||||
|
||||
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
|
||||
private final static AtomicIntegerFieldUpdater<PipePayloadParser> CURRENT_PIPE =
|
||||
AtomicIntegerFieldUpdater.newUpdater(PipePayloadParser.class, "currentPipe");
|
||||
|
||||
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
||||
private final Sinks.Many<Buffer> sink = Reactors.createMany();
|
||||
|
||||
private final List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
|
||||
private final List<BiConsumer<Buffer, PipePayloadParser>> pipe = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final List<Buffer> result = new CopyOnWriteArrayList<>();
|
||||
|
||||
|
|
@ -42,7 +44,7 @@ public class PipePayloadParser implements PayloadParser {
|
|||
|
||||
private Consumer<RecordParser> firstInit;
|
||||
|
||||
private final AtomicInteger currentPipe = new AtomicInteger();
|
||||
private volatile int currentPipe;
|
||||
|
||||
public Buffer newBuffer() {
|
||||
return Buffer.buffer();
|
||||
|
|
@ -57,10 +59,12 @@ public class PipePayloadParser implements PayloadParser {
|
|||
}
|
||||
|
||||
public PipePayloadParser handler(Consumer<Buffer> handler) {
|
||||
pipe.add(handler);
|
||||
|
||||
pipe.add((buffer, parser) -> handler.accept(buffer));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public PipePayloadParser delimited(String delimited) {
|
||||
if (recordParser == null) {
|
||||
setParser(RecordParser.newDelimited(delimited));
|
||||
|
|
@ -90,22 +94,22 @@ public class PipePayloadParser implements PayloadParser {
|
|||
return this;
|
||||
}
|
||||
|
||||
private Consumer<Buffer> getNextHandler() {
|
||||
int i = currentPipe.getAndIncrement();
|
||||
private BiConsumer<Buffer, PipePayloadParser> getNextHandler() {
|
||||
int i = CURRENT_PIPE.getAndIncrement(this);
|
||||
if (i < pipe.size()) {
|
||||
return pipe.get(i);
|
||||
}
|
||||
currentPipe.set(0);
|
||||
CURRENT_PIPE.set(this, 0);
|
||||
return pipe.get(0);
|
||||
}
|
||||
|
||||
private void setParser(RecordParser parser) {
|
||||
this.recordParser = parser;
|
||||
this.recordParser.handler(buffer -> getNextHandler().accept(buffer));
|
||||
this.recordParser.handler(buffer -> getNextHandler().accept(buffer, this));
|
||||
}
|
||||
|
||||
public PipePayloadParser complete() {
|
||||
currentPipe.set(0);
|
||||
CURRENT_PIPE.set(this, 0);
|
||||
if (recordParser != null) {
|
||||
firstInit.accept(recordParser);
|
||||
}
|
||||
|
|
@ -115,7 +119,7 @@ public class PipePayloadParser implements PayloadParser {
|
|||
buffer.appendBuffer(buf);
|
||||
}
|
||||
this.result.clear();
|
||||
sink.next(buffer);
|
||||
sink.emitNext(buffer, Reactors.emitFailureHandler());
|
||||
}
|
||||
return this;
|
||||
|
||||
|
|
@ -138,13 +142,13 @@ public class PipePayloadParser implements PayloadParser {
|
|||
}
|
||||
Buffer buf = directMapper.apply(buffer);
|
||||
if (null != buf) {
|
||||
sink.next(buf);
|
||||
sink.emitNext(buf, Reactors.emitFailureHandler());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Buffer> handlePayload() {
|
||||
return processor.map(Function.identity());
|
||||
return sink.asFlux();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -155,8 +159,8 @@ public class PipePayloadParser implements PayloadParser {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
processor.onComplete();
|
||||
currentPipe.set(0);
|
||||
sink.tryEmitComplete();
|
||||
CURRENT_PIPE.set(this, 0);
|
||||
this.result.clear();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,16 +1,17 @@
|
|||
package org.jetlinks.community.network.tcp.parser.strateies;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.hswebframework.expands.script.engine.DynamicScriptEngine;
|
||||
import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
import org.hswebframework.web.utils.DigestUtils;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParser;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy {
|
||||
@Override
|
||||
|
|
@ -20,24 +21,33 @@ public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy
|
|||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public PayloadParser build(ValueObject config) {
|
||||
public Supplier<PayloadParser> buildLazy(ValueObject config) {
|
||||
String script = config.getString("script")
|
||||
.orElseThrow(() -> new IllegalArgumentException("script不能为空"));
|
||||
.orElseThrow(() -> new IllegalArgumentException("script不能为空"));
|
||||
String lang = config.getString("lang")
|
||||
.orElseThrow(() -> new IllegalArgumentException("lang不能为空"));
|
||||
.orElse("js");
|
||||
|
||||
DynamicScriptEngine engine = DynamicScriptEngineFactory.getEngine(lang);
|
||||
if (engine == null) {
|
||||
throw new IllegalArgumentException("不支持的脚本:" + lang);
|
||||
}
|
||||
PipePayloadParser parser = new PipePayloadParser();
|
||||
|
||||
String id = DigestUtils.md5Hex(script);
|
||||
if (!engine.compiled(id)) {
|
||||
engine.compile(id, script);
|
||||
}
|
||||
Map<String, Object> ctx = new HashMap<>();
|
||||
ctx.put("parser", parser);
|
||||
engine.execute(id, ctx).getIfSuccess();
|
||||
return parser;
|
||||
return () -> {
|
||||
PipePayloadParser parser = new PipePayloadParser();
|
||||
|
||||
Map<String, Object> ctx = new HashMap<>();
|
||||
ctx.put("parser", parser);
|
||||
try {
|
||||
engine.execute(id, ctx).getIfSuccess();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return parser;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,32 +2,31 @@ package org.jetlinks.community.network.tcp.parser.strateies;
|
|||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParser;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import org.jetlinks.community.ValueObject;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Sinks;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderStrategy {
|
||||
@Override
|
||||
public abstract PayloadParserType getType();
|
||||
|
||||
protected abstract RecordParser createParser(ValueObject config);
|
||||
protected abstract Supplier<RecordParser> createParser(ValueObject config);
|
||||
|
||||
@Override
|
||||
public PayloadParser build(ValueObject config) {
|
||||
return new RecordPayloadParser(() -> createParser(config));
|
||||
public Supplier<PayloadParser> buildLazy(ValueObject config) {
|
||||
Supplier<RecordParser> parser = createParser(config);
|
||||
return () -> new RecordPayloadParser(parser);
|
||||
}
|
||||
|
||||
static class RecordPayloadParser implements PayloadParser {
|
||||
private final Supplier<RecordParser> recordParserSupplier;
|
||||
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
|
||||
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
||||
private final Sinks.Many<Buffer> sink = Reactors.createMany();
|
||||
|
||||
private RecordParser recordParser;
|
||||
|
||||
|
|
@ -43,18 +42,20 @@ public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderS
|
|||
|
||||
@Override
|
||||
public Flux<Buffer> handlePayload() {
|
||||
return processor.map(Function.identity());
|
||||
return sink.asFlux();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
processor.onComplete();
|
||||
sink.emitComplete(Reactors.emitFailureHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
this.recordParser = recordParserSupplier.get();
|
||||
this.recordParser.handler(sink::next);
|
||||
this.recordParser.handler(payload -> {
|
||||
sink.emitNext(payload, Reactors.emitFailureHandler());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
|
|||
}
|
||||
// 根据解析类型配置数据解析器
|
||||
payloadParserBuilder.build(properties.getParserType(), properties);
|
||||
tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), properties));
|
||||
tcpServer.setParserSupplier( payloadParserBuilder.build(properties.getParserType(), properties));
|
||||
tcpServer.setServer(instances);
|
||||
tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
|
||||
// 针对JVM做的多路复用优化
|
||||
|
|
|
|||
|
|
@ -10,9 +10,11 @@ import org.jetlinks.community.network.NetworkType;
|
|||
import org.jetlinks.community.network.tcp.client.TcpClient;
|
||||
import org.jetlinks.community.network.tcp.client.VertxTcpClient;
|
||||
import org.jetlinks.community.network.tcp.parser.PayloadParser;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Sinks;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
|
|
@ -28,8 +30,8 @@ public class VertxTcpServer implements TcpServer {
|
|||
|
||||
@Getter
|
||||
private final String id;
|
||||
private final EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
|
||||
private final FluxSink<TcpClient> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
||||
|
||||
private final Sinks.Many<TcpClient> sink = Reactors.createMany(Integer.MAX_VALUE, false);
|
||||
Collection<NetServer> tcpServers;
|
||||
private Supplier<PayloadParser> parserSupplier;
|
||||
@Setter
|
||||
|
|
@ -41,8 +43,7 @@ public class VertxTcpServer implements TcpServer {
|
|||
|
||||
@Override
|
||||
public Flux<TcpClient> handleConnection() {
|
||||
return processor
|
||||
.map(Function.identity());
|
||||
return sink.asFlux();
|
||||
}
|
||||
|
||||
private void execute(Runnable runnable) {
|
||||
|
|
@ -80,7 +81,7 @@ public class VertxTcpServer implements TcpServer {
|
|||
* @param socket socket
|
||||
*/
|
||||
protected void acceptTcpConnection(NetSocket socket) {
|
||||
if (!processor.hasDownstreams()) {
|
||||
if (sink.currentSubscriberCount() == 0) {
|
||||
log.warn("not handler for tcp client[{}]", socket.remoteAddress());
|
||||
socket.close();
|
||||
return;
|
||||
|
|
@ -101,8 +102,7 @@ public class VertxTcpServer implements TcpServer {
|
|||
// 调用坐标 org.jetlinks.community.network.tcp.server.TcpServerProvider.initTcpServer
|
||||
client.setRecordParser(parserSupplier.get());
|
||||
client.setSocket(socket);
|
||||
// client放进了发射器
|
||||
sink.next(client);
|
||||
sink.emitNext(client, Reactors.emitFailureHandler());
|
||||
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
|
||||
} catch (Exception e) {
|
||||
log.error("create tcp server client error", e);
|
||||
|
|
@ -118,6 +118,7 @@ public class VertxTcpServer implements TcpServer {
|
|||
@Override
|
||||
public void shutdown() {
|
||||
if (null != tcpServers) {
|
||||
log.debug("close tcp server :[{}]", id);
|
||||
for (NetServer tcpServer : tcpServers) {
|
||||
execute(tcpServer::close);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,24 @@
|
|||
package org.jetlinks.community.standalone.web;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@ConfigurationProperties(prefix = "api")
|
||||
@Component
|
||||
public class ApiInfoProperties {
|
||||
|
||||
@Schema(description = "api根路径")
|
||||
private String basePath;
|
||||
|
||||
@Schema(description = "api地址信息")
|
||||
private Map<String, String> urls = new ConcurrentHashMap<>();
|
||||
|
||||
}
|
||||
|
|
@ -1,20 +1,54 @@
|
|||
package org.jetlinks.community.standalone.web;
|
||||
|
||||
import org.hswebframework.web.authorization.annotation.Authorize;
|
||||
import org.jetlinks.community.Version;
|
||||
import lombok.Getter;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RequestMapping("/system")
|
||||
@RestController
|
||||
public class SystemInfoController {
|
||||
|
||||
@GetMapping("/version")
|
||||
@Authorize(ignore = true)
|
||||
public Mono<Version> getVersion() {
|
||||
return Mono.just(Version.current);
|
||||
private static final Map<String, Object> versionResponse;
|
||||
|
||||
private final ApiInfoProperties infoProperties;
|
||||
|
||||
static {
|
||||
versionResponse = new HashMap<>();
|
||||
versionResponse.put("result", new Version());
|
||||
versionResponse.put("status", 200);
|
||||
versionResponse.put("code", "success");
|
||||
}
|
||||
|
||||
public SystemInfoController(ApiInfoProperties infoProperties) {
|
||||
this.infoProperties = infoProperties;
|
||||
}
|
||||
|
||||
@GetMapping("/version")
|
||||
public Mono<Map<String, Object>> getVersion() {
|
||||
|
||||
return Mono.just(versionResponse);
|
||||
}
|
||||
|
||||
@GetMapping("/apis")
|
||||
public Mono<Map<String, Object>> getApis() {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("result", infoProperties);
|
||||
map.put("status", 200);
|
||||
map.put("code", "success");
|
||||
return Mono.just(map);
|
||||
}
|
||||
|
||||
|
||||
@Getter
|
||||
public static class Version {
|
||||
private final String edition = "pro";
|
||||
private final String version = "1.1.0-SNAPSHOT";
|
||||
private final String mode = "cloud";
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,10 +18,10 @@ spring:
|
|||
elasticsearch:
|
||||
client:
|
||||
reactive:
|
||||
endpoints: ${elasticsearch.client.host}:${elasticsearch.client.port}
|
||||
endpoints: localhost:9201
|
||||
max-in-memory-size: 100MB
|
||||
socket-timeout: ${elasticsearch.client.socket-timeout}
|
||||
connection-timeout: ${elasticsearch.client.socket-timeout}
|
||||
socket-timeout: 5000
|
||||
connection-timeout: 8000
|
||||
easyorm:
|
||||
default-schema: PUBLIC # 数据库默认的schema
|
||||
dialect: h2 #数据库方言
|
||||
|
|
@ -31,13 +31,6 @@ elasticsearch:
|
|||
data-path: ./data/elasticsearch
|
||||
port: 9201
|
||||
host: 0.0.0.0
|
||||
client:
|
||||
host: localhost
|
||||
port: 9201
|
||||
max-conn-total: 128
|
||||
connect-timeout: 5000
|
||||
socket-timeout: 5000
|
||||
connection-request-timeout: 8000
|
||||
index:
|
||||
default-strategy: time-by-month #默认es的索引按月进行分表, direct则为直接操作索引.
|
||||
settings:
|
||||
|
|
|
|||
|
|
@ -109,6 +109,9 @@ hsweb:
|
|||
file:
|
||||
manager:
|
||||
storage-base-path: ./data/files
|
||||
api:
|
||||
# 访问系统接口的根地址
|
||||
base-path: http://127.0.0.1:${server.port}
|
||||
|
||||
jetlinks:
|
||||
server-id: ${spring.application.name}:${server.port} #设备服务网关服务ID,不同服务请设置不同的ID
|
||||
|
|
|
|||
12
pom.xml
12
pom.xml
|
|
@ -19,10 +19,10 @@
|
|||
<spring.boot.version>2.5.13</spring.boot.version>
|
||||
<java.version>1.8</java.version>
|
||||
<project.build.jdk>${java.version}</project.build.jdk>
|
||||
<hsweb.framework.version>4.0.15-SNAPSHOT</hsweb.framework.version>
|
||||
<easyorm.version>4.1.0-SNAPSHOT</easyorm.version>
|
||||
<hsweb.framework.version>4.0.15</hsweb.framework.version>
|
||||
<easyorm.version>4.1.0</easyorm.version>
|
||||
<hsweb.expands.version>3.0.2</hsweb.expands.version>
|
||||
<jetlinks.version>1.2.0-SNAPSHOT</jetlinks.version>
|
||||
<jetlinks.version>1.2.0</jetlinks.version>
|
||||
<r2dbc.version>Borca-SR1</r2dbc.version>
|
||||
<netty.version>4.1.74.Final</netty.version>
|
||||
<elasticsearch.version>7.11.2</elasticsearch.version>
|
||||
|
|
@ -521,7 +521,7 @@
|
|||
<repository>
|
||||
<id>hsweb-nexus</id>
|
||||
<name>Nexus Release Repository</name>
|
||||
<url>https://nexus.hsweb.me/content/groups/public/</url>
|
||||
<url>https://nexus.jetlinks.cn/content/groups/public/</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
<updatePolicy>always</updatePolicy>
|
||||
|
|
@ -534,12 +534,12 @@
|
|||
<repository>
|
||||
<id>releases</id>
|
||||
<name>Nexus Release Repository</name>
|
||||
<url>https://nexus.hsweb.me/content/repositories/releases/</url>
|
||||
<url>https://nexus.jetlinks.cn/content/repositories/releases/</url>
|
||||
</repository>
|
||||
<snapshotRepository>
|
||||
<id>snapshots</id>
|
||||
<name>Nexus Snapshot Repository</name>
|
||||
<url>https://nexus.hsweb.me/content/repositories/snapshots/</url>
|
||||
<url>https://nexus.jetlinks.cn/content/repositories/snapshots/</url>
|
||||
</snapshotRepository>
|
||||
</distributionManagement>
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue