Compare commits

...

2 Commits
2.11 ... 2.2

Author SHA1 Message Date
老周 f70f761067
Merge pull request #596 from zxl1951/fix-td-save-2.2
fix(存储策略): 涛思数据库存储使用无模式写入创建表缺失messageId列
2024-12-13 14:37:49 +08:00
ZxL dea91e8353 fix(存储策略): 涛思数据库存储使用无模式写入创建表缺失messageId列 2024-12-13 12:06:04 +08:00
5 changed files with 29 additions and 4 deletions

View File

@ -13,6 +13,8 @@ import reactor.core.publisher.Mono;
import java.util.*;
import static org.jetlinks.community.things.data.ThingsDataConstants.COLUMN_MESSAGE_ID;
class TDengineColumnModeSaveOperations extends ColumnModeSaveOperationsBase {
private final TDengineThingDataHelper helper;
@ -34,7 +36,12 @@ class TDengineColumnModeSaveOperations extends ColumnModeSaveOperationsBase {
protected String createPropertyDataId(ThingMessage message) {
return message.getMessageId();
}
@Override
protected Map<String, Object> createLogData(String templateId, ThingMessage message) {
Map<String, Object> data = super.createLogData(templateId,message);
data.put(COLUMN_MESSAGE_ID, Objects.isNull(message.getMessageId()) ? "" : message.getMessageId());
return data;
}
@Override
protected Map<String, Object> handlePropertiesData(ThingMetadata metadata, Map<String, Object> properties) {
properties = super.handlePropertiesData(metadata, properties);

View File

@ -13,6 +13,8 @@ import reactor.core.publisher.Mono;
import java.util.*;
import static org.jetlinks.community.things.data.ThingsDataConstants.COLUMN_MESSAGE_ID;
class TDengineRowModeSaveOperations extends RowModeSaveOperationsBase {
private final TDengineThingDataHelper helper;
@ -62,6 +64,12 @@ class TDengineRowModeSaveOperations extends RowModeSaveOperationsBase {
return helper.doSave(metric, data, this::isTagValue);
}
@Override
protected Map<String, Object> createLogData(String templateId, ThingMessage message) {
Map<String, Object> data = super.createLogData(templateId,message);
data.put(COLUMN_MESSAGE_ID, Objects.isNull(message.getMessageId()) ? "" : message.getMessageId());
return data;
}
@Override
protected Mono<Void> doSave(String metric, Flux<TimeSeriesData> data) {
return helper.doSave(metric, data, this::isTagValue);

View File

@ -73,7 +73,7 @@ public abstract class AbstractSaveOperations implements SaveOperations {
.then();
}
protected Map<String, Object> createLogData(ThingMessage message) {
protected Map<String, Object> createLogData(String templateId, ThingMessage message) {
Map<String, Object> data = Maps.newHashMapWithExpectedSize(8);
data.put(COLUMN_ID, getOrCreateUid(message));
data.put(metricBuilder.getThingIdProperty(), message.getThingId());
@ -81,6 +81,9 @@ public abstract class AbstractSaveOperations implements SaveOperations {
data.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
data.put(COLUMN_MESSAGE_ID, message.getMessageId());
data.put(COLUMN_LOG_TYPE, ThingLogType.of(message).name());
if (settings.getLogFilter().isAllInOne()) {
data.put(metricBuilder.getTemplateIdProperty(), templateId);
}
String log;
if (message instanceof DeviceLogMessage) {
log = ((DeviceLogMessage) message).getLog();
@ -323,7 +326,7 @@ public abstract class AbstractSaveOperations implements SaveOperations {
return Mono.just(Tuples.of(
metricBuilder.createLogMetric(message.getThingType(), templateId, message.getThingId()),
TimeSeriesData.of(message.getTimestamp(), createLogData(message))));
TimeSeriesData.of(message.getTimestamp(), createLogData(templateId,message))));
}
protected abstract Flux<Tuple2<String, TimeSeriesData>> convertProperties(String templateId,

View File

@ -19,7 +19,8 @@ public class DataSettings {
@Getter
@Setter
public static class Log extends MessageTypeMatcher {
//使用同一个表来存储所有的日志数据
private boolean allInOne = false;
}
@Getter

View File

@ -36,4 +36,10 @@ public interface MetricBuilder {
String eventId) {
return thingType + "_event_" + thingTemplateId + "_" + eventId;
}
/**
* @return 物模版ID的字段标识, 默认为{@link ThingsDataConstants#COLUMN_THING_ID}
*/
default String getTemplateIdProperty() {
return ThingsDataConstants.COLUMN_TEMPLATE_ID;
}
}