Просмотр исходного кода

修改问题:1、所有表 timestamp 存时间类型 pe_iot 后缀80去掉

     2、accept、fault、issue 这3个改成redis存,redis判断有变化,每30s备份到mysql, 读直接从redis读(跟pe_iot的逻辑一样)
     3、status表 fleet_id去掉, 逻辑也是redis为主 (跟pe_iot的逻辑一样) id必须有
     4、issue、fault、accept 的 update_time没用,去掉
     5、redis数据变化逻辑,忽略时间timetsamp变化
     6、alert 坐标去掉 加上id
mqy20260511
lenovo 2 недель назад
Родитель
Сommit
0928f0224b
18 измененных файлов: 323 добавлений и 106 удалений
  1. 9
    0
      iot-platform/src/main/java/com/iot/platform/domain/SysRealtime.java
  2. 1
    3
      iot-platform/src/main/java/com/iot/platform/mapper/SysAlarmMapper.java
  3. 13
    2
      iot-platform/src/main/java/com/iot/platform/mapper/SysControllerMapper.java
  4. 6
    8
      iot-platform/src/main/java/com/iot/platform/mapper/SysFaultMapper.java
  5. 1
    0
      iot-platform/src/main/java/com/iot/platform/mapper/SysRealtimeMapper.java
  6. 24
    14
      iot-platform/src/main/java/com/iot/platform/mqtt/MqttFaultConsumer.java
  7. 99
    21
      iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java
  8. 56
    11
      iot-platform/src/main/java/com/iot/platform/mqtt/MqttStatusConsumer.java
  9. 2
    2
      iot-platform/src/main/java/com/iot/platform/service/SysAlarmService.java
  10. 12
    2
      iot-platform/src/main/java/com/iot/platform/service/SysControllerService.java
  11. 8
    10
      iot-platform/src/main/java/com/iot/platform/service/SysFaultService.java
  12. 5
    0
      iot-platform/src/main/java/com/iot/platform/service/SysRealtimeService.java
  13. 47
    21
      iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java
  14. 2
    2
      iot-platform/src/main/resources/mapper/SysAlarmMapper.xml
  15. 19
    2
      iot-platform/src/main/resources/mapper/SysControllerMapper.xml
  16. 6
    7
      iot-platform/src/main/resources/mapper/SysFaultMapper.xml
  17. 8
    0
      iot-platform/src/main/resources/mapper/SysRealtimeMapper.xml
  18. 5
    1
      iot-platform/src/test/java/com/iot/platform/mqtt/MqttStatusConsumerTest.java

+ 9
- 0
iot-platform/src/main/java/com/iot/platform/domain/SysRealtime.java Просмотреть файл

@@ -1,12 +1,21 @@
1 1
 package com.iot.platform.domain;
2 2
 
3 3
 public class SysRealtime {
4
+    private Long id;
4 5
     private String createTime;
5 6
     private String deviceId;
6 7
     private String timestamp;
7 8
     private String k;
8 9
     private String v;
9 10
 
11
+    public Long getId() {
12
+        return id;
13
+    }
14
+
15
+    public void setId(Long id) {
16
+        this.id = id;
17
+    }
18
+
10 19
     public String getCreateTime() {
11 20
         return createTime;
12 21
     }

+ 1
- 3
iot-platform/src/main/java/com/iot/platform/mapper/SysAlarmMapper.java Просмотреть файл

@@ -11,7 +11,5 @@ public interface SysAlarmMapper {
11 11
                      @Param("createtime")String createtime,
12 12
                      @Param("messageType")String messageType,
13 13
                      @Param("controllerId")String controllerId,
14
-                     @Param("deviceId")String deviceId,
15
-                     @Param("longitude")String longitude,
16
-                     @Param("latitude")String latitude);
14
+                     @Param("deviceId")String deviceId);
17 15
 }

+ 13
- 2
iot-platform/src/main/java/com/iot/platform/mapper/SysControllerMapper.java Просмотреть файл

@@ -40,8 +40,19 @@ public interface SysControllerMapper {
40 40
                                 @Param("fleetId") String fleetId,
41 41
                                 @Param("name") String name,
42 42
                                 @Param("path") String path,
43
-                                @Param("deviceId") String deviceId,
44
-                                @Param("updateTime") String updateTime);
43
+                                @Param("deviceId") String deviceId);
44
+
45
+    void updateSysControllerCmd(@Param("controllerId") String controllerId,
46
+                                @Param("timestamp") String timestamp,
47
+                                @Param("fleetId") String fleetId,
48
+                                @Param("name") String name,
49
+                                @Param("path") String path);
50
+
51
+    void updateSysControllerFault(@Param("controllerId") String controllerId,
52
+                                  @Param("timestamp") String timestamp,
53
+                                  @Param("fleetId") String fleetId,
54
+                                  @Param("name") String name,
55
+                                  @Param("path") String path);
45 56
 
46 57
     SysController selectControllerPath(@Param("path") String path);
47 58
 

+ 6
- 8
iot-platform/src/main/java/com/iot/platform/mapper/SysFaultMapper.java Просмотреть файл

@@ -6,25 +6,23 @@ import org.apache.ibatis.annotations.Param;
6 6
 @Mapper
7 7
 public interface SysFaultMapper {
8 8
 
9
-    void insertFault(@Param("faultId") String faultId,
9
+    void insertAlertData(@Param("faultId") String faultId,
10 10
                      @Param("faultdescs") String faultdescs,
11 11
                      @Param("faultstatus") String faultstatus,
12 12
                      @Param("createtime") String createtime,
13 13
                      @Param("messageType") String messageType,
14 14
                      @Param("controllerId") String controllerId,
15 15
                      @Param("deviceId") String deviceId,
16
-                     @Param("longitude") String longitude,
17
-                     @Param("latitude") String latitude,
18
-                     @Param("readpeople") String readpeople);
16
+                     @Param("readpeople") String readpeople,
17
+                     @Param("timestamp") String timestamp);
19 18
 
20
-    void updateFault(@Param("faultstatus")String faultstatus,
19
+    void updateAlertData(@Param("faultstatus")String faultstatus,
21 20
                      @Param("messageType")String messageType,
22
-                     @Param("longitude")String longitude,
23
-                     @Param("latitude")String latitude,
24 21
                      @Param("faultdescs")String faultdescs,
25 22
                      @Param("controllerId")String controllerId,
26 23
                      @Param("deviceId")String deviceId,
27
-                     @Param("createtime")String createtime);
24
+                     @Param("createtime")String createtime,
25
+                     @Param("timestamp")String timestamp);
28 26
 
29 27
 
30 28
     Integer selectFaultCount(@Param("fleetId")String fleetId);

+ 1
- 0
iot-platform/src/main/java/com/iot/platform/mapper/SysRealtimeMapper.java Просмотреть файл

@@ -8,6 +8,7 @@ import java.util.List;
8 8
 @Mapper
9 9
 public interface SysRealtimeMapper {
10 10
     void createRealtime(@Param("tableName")String tableName);
11
+    int tableExists(@Param("tableName")String tableName);
11 12
     List<String> selectTables();
12 13
     void insertTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId, @Param("timestamp")String timestamp, @Param("k")String k, @Param("v")Object v);
13 14
     void updateTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("v")Object v, @Param("timestamp")String timestamp, @Param("k")String k, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId);

+ 24
- 14
iot-platform/src/main/java/com/iot/platform/mqtt/MqttFaultConsumer.java Просмотреть файл

@@ -3,7 +3,6 @@ package com.iot.platform.mqtt;
3 3
 import com.alibaba.fastjson2.JSON;
4 4
 import com.alibaba.fastjson2.TypeReference;
5 5
 import com.iot.platform.config.IotProperties;
6
-import com.iot.platform.domain.SysDevice;
7 6
 import com.iot.platform.domain.SysFault;
8 7
 import com.iot.platform.service.*;
9 8
 import com.iot.platform.common.utils.NumericIdGenerator;
@@ -14,8 +13,10 @@ import org.springframework.web.client.RestTemplate;
14 13
 import org.springframework.web.util.UriComponentsBuilder;
15 14
 
16 15
 import java.sql.SQLException;
16
+import java.time.Instant;
17 17
 import java.time.LocalDate;
18 18
 import java.time.LocalDateTime;
19
+import java.time.ZoneId;
19 20
 import java.time.format.DateTimeFormatter;
20 21
 import java.util.*;
21 22
 import java.util.concurrent.ExecutorService;
@@ -38,6 +39,9 @@ public class MqttFaultConsumer extends AbstractMqttConsumer {
38 39
     private static final String ALARM_STATUS_TRIGGER = "0";
39 40
     private static final String ALARM_STATUS_RECOVERED = "1";
40 41
     private static final String DATE_TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
42
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter
43
+            .ofPattern(DATE_TIME_PATTERN)
44
+            .withZone(ZoneId.systemDefault());
41 45
     private static final String COMPANY_ID_PREFIX = "GJ";
42 46
     private static final Pattern CONTROLLER_ID_PATTERN = Pattern.compile("^[a-zA-Z0-9_]+$");
43 47
 
@@ -174,25 +178,19 @@ public class MqttFaultConsumer extends AbstractMqttConsumer {
174 178
             sysFaultService.createMessage(controllerName);
175 179
         }
176 180
 
177
-        SysDevice longitude = sysControllerService.selectCoordinates(topics[0], "经度");
178
-        SysDevice latitude = sysControllerService.selectCoordinates(topics[0], "纬度");
179
-        if (longitude == null || latitude == null) {
180
-            log.info("未查询到控制器经纬度信息: {}", topics[0]);
181
-            return;
182
-        }
183
-        String longitudeValue = longitude.getV();
184
-        String latitudeValue = latitude.getV();
181
+        String timestamp = faultData.getTimestamp();
182
+        String dateTime = toDateTime(timestamp);
185 183
         String companyId = COMPANY_ID_PREFIX + numericIdGenerator.nextId();
186 184
         LocalDateTime currentTime = LocalDateTime.now();
187 185
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DATE_TIME_PATTERN);
188 186
         String currentTimeStr = currentTime.format(formatter);
189 187
 
190 188
         if ("触发".equals(type)) {
191
-            sysAlarmService.insertAlarm(controllerName, companyId, desc, ALARM_STATUS_TRIGGER, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId, longitudeValue, latitudeValue);
192
-            sysFaultService.insertFault(companyId, desc, ALARM_STATUS_TRIGGER, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId, longitudeValue, latitudeValue, "");
189
+//            sysAlarmService.insertAlarm(controllerName, companyId, desc, ALARM_STATUS_TRIGGER, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId);
190
+            sysFaultService.insertAlertData(companyId, desc, ALARM_STATUS_TRIGGER, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId, "", dateTime);
193 191
         } else if ("恢复".equals(type)) {
194
-            sysAlarmService.insertAlarm(controllerName, companyId, desc, ALARM_STATUS_RECOVERED, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId, longitudeValue, latitudeValue);
195
-            sysFaultService.updateFault(ALARM_STATUS_RECOVERED, ALARM_STATUS_TRIGGER, longitudeValue, latitudeValue, desc, controllerId, deviceId, currentTimeStr);
192
+//            sysAlarmService.insertAlarm(controllerName, companyId, desc, ALARM_STATUS_RECOVERED, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId);
193
+            sysFaultService.updateAlertData(ALARM_STATUS_RECOVERED, ALARM_STATUS_TRIGGER, desc, controllerId, deviceId, currentTimeStr, dateTime);
196 194
         }
197 195
 
198 196
         // controllerId 白名单校验,防止 SSRF
@@ -210,4 +208,16 @@ public class MqttFaultConsumer extends AbstractMqttConsumer {
210 208
             log.info("Webhook 调用失败,不影响主流程: {}", e.getMessage());
211 209
         }
212 210
     }
213
-}
211
+
212
+    private String toDateTime(String timestamp) {
213
+        long ts;
214
+        try {
215
+            ts = Long.parseLong(timestamp);
216
+        } catch (NumberFormatException e) {
217
+            log.info("timestamp 格式错误: {},使用当前时间", timestamp);
218
+            ts = System.currentTimeMillis();
219
+        }
220
+        return DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(ts));
221
+    }
222
+
223
+}

+ 99
- 21
iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java Просмотреть файл

@@ -14,8 +14,11 @@ import java.time.Instant;
14 14
 import java.time.ZoneId;
15 15
 import java.time.format.DateTimeFormatter;
16 16
 import java.util.Collections;
17
+import java.util.HashMap;
17 18
 import java.util.List;
19
+import java.util.Map;
18 20
 import java.util.concurrent.ExecutorService;
21
+import java.util.concurrent.TimeUnit;
19 22
 
20 23
 /**
21 24
  * 存储控制器数据
@@ -31,6 +34,16 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
31 34
             .ofPattern("yyyy-MM-dd HH:mm:ss")
32 35
             .withZone(ZoneId.systemDefault());
33 36
 
37
+    private String toDateTime(String timestamp) {
38
+        long ts;
39
+        try {
40
+            ts = Long.parseLong(timestamp);
41
+        } catch (NumberFormatException e) {
42
+            log.info("timestamp 格式错误: {},使用当前时间", timestamp);
43
+            ts = System.currentTimeMillis();
44
+        }
45
+        return DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(ts));
46
+    }
34 47
     @Autowired
35 48
     public MqttGenericConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService,
36 49
                                IotProperties iotProperties,
@@ -96,6 +109,8 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
96 109
 
97 110
     private int processTopics(List<Topics> topics, String controllerId, String timestamp, String fleetId) {
98 111
         int newControllerCount = 0;
112
+        String dateTime = toDateTime(timestamp);
113
+
99 114
         for (Topics topicsMap : topics) {
100 115
             String path = topicsMap.getPath();
101 116
             String name = topicsMap.getName();
@@ -108,27 +123,45 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
108 123
                 log.info("path 中缺少 deviceId,跳过: path={}", path);
109 124
                 continue;
110 125
             }
111
-            Integer count = sysControllerService.selectSysControllerCount(path);
112
-            if (count <= 0) {
113
-                stringRedisTemplate.opsForHash().put(controllerId + ":" + name, "path", path);
114
-                sysControllerService.insertSysController(controllerId, timestamp, fleetId, name, path, deviceId);
115
-                newControllerCount++;
126
+
127
+            String redisKey = controllerId + "_" + name;
128
+            // 读取 Redis 旧数据用于比较
129
+            Map<Object, Object> oldData = stringRedisTemplate.opsForHash().entries(redisKey);
130
+            boolean redisEmpty = (oldData == null || oldData.isEmpty());
131
+
132
+            // 写入最新数据到 Redis
133
+            Map<String, String> newData = new HashMap<>();
134
+            newData.put("path", path);
135
+            newData.put("timestamp", dateTime);
136
+            newData.put("name", name);
137
+            newData.put("deviceId", deviceId);
138
+            stringRedisTemplate.opsForHash().putAll(redisKey, newData);
139
+            stringRedisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
140
+
141
+            if (redisEmpty) {
142
+                // 首次写入:查一次 MySQL 判断新增还是更新(Redis 过期但 MySQL 有数据时走更新)
143
+                Integer count = sysControllerService.selectSysControllerCount(path);
144
+                if (count <= 0) {
145
+                    sysControllerService.insertSysController(controllerId, dateTime, fleetId, name, path, deviceId);
146
+                    newControllerCount++;
147
+                } else {
148
+                    sysControllerService.updateControllerAccept(controllerId, dateTime, fleetId, name, path, deviceId);
149
+                }
116 150
             } else {
117
-                long ts;
118
-                try {
119
-                    ts = Long.parseLong(timestamp);
120
-                } catch (NumberFormatException e) {
121
-                    log.info("timestamp 格式错误: {},使用当前时间", timestamp);
122
-                    ts = System.currentTimeMillis();
151
+                // Redis 有旧数据:比较关键字段(忽略 timestamp),变化时直接更新 MySQL
152
+                if (!equals(oldData.get("path"), path)
153
+                        || !equals(oldData.get("name"), name)
154
+                        || !equals(oldData.get("deviceId"), deviceId)) {
155
+                    sysControllerService.updateControllerAccept(controllerId, dateTime, fleetId, name, path, deviceId);
123 156
                 }
124
-                String date = DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(ts));
125
-                sysControllerService.updateControllerAccept(controllerId, timestamp, fleetId, name, path, deviceId, date);
126 157
             }
127 158
         }
128 159
         return newControllerCount;
129 160
     }
130 161
 
131 162
     private void processCmdTopics(List<Topics> cmdtopics, String controllerId, String timestamp, String fleetId) {
163
+        String dateTime = toDateTime(timestamp);
164
+
132 165
         for (Topics cmdtopicsMap : cmdtopics) {
133 166
             String path = cmdtopicsMap.getPath();
134 167
             String name = cmdtopicsMap.getName();
@@ -136,10 +169,30 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
136 169
                 log.info("CmdTopic 数据不完整,跳过: path={}, name={}", path, name);
137 170
                 continue;
138 171
             }
139
-            Integer count = sysControllerService.selectSysControllerCmdCount(path);
140
-            if (count <= 0) {
141
-                stringRedisTemplate.opsForHash().put(controllerId + "_cmd:" + name, "path", path);
142
-                sysControllerService.insertSysControllerCmd(controllerId, timestamp, fleetId, name, path);
172
+
173
+            String redisKey = controllerId + "_issue_" + name;
174
+            Map<Object, Object> oldData = stringRedisTemplate.opsForHash().entries(redisKey);
175
+            boolean redisEmpty = (oldData == null || oldData.isEmpty());
176
+
177
+            Map<String, String> newData = new HashMap<>();
178
+            newData.put("path", path);
179
+            newData.put("timestamp", dateTime);
180
+            newData.put("name", name);
181
+            stringRedisTemplate.opsForHash().putAll(redisKey, newData);
182
+            stringRedisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
183
+
184
+            if (redisEmpty) {
185
+                Integer count = sysControllerService.selectSysControllerCmdCount(path);
186
+                if (count <= 0) {
187
+                    sysControllerService.insertSysControllerCmd(controllerId, dateTime, fleetId, name, path);
188
+                } else {
189
+                    sysControllerService.updateSysControllerCmd(controllerId, dateTime, fleetId, name, path);
190
+                }
191
+            } else {
192
+                if (!equals(oldData.get("path"), path)
193
+                        || !equals(oldData.get("name"), name)) {
194
+                    sysControllerService.updateSysControllerCmd(controllerId, dateTime, fleetId, name, path);
195
+                }
143 196
             }
144 197
         }
145 198
     }
@@ -154,10 +207,35 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
154 207
             log.info("FaultProt 数据不完整,跳过: path={}, name={}", path, name);
155 208
             return;
156 209
         }
157
-        Integer count = sysControllerService.selectSysControllerFaultCount(path);
158
-        if (count <= 0) {
159
-            stringRedisTemplate.opsForHash().put(controllerId + "_fault:" + name, "path", path);
160
-            sysControllerService.insertSysControllerFault(controllerId, timestamp, fleetId, name, path);
210
+
211
+        String dateTime = toDateTime(timestamp);
212
+        String redisKey = controllerId + "_fault_" + name;
213
+        Map<Object, Object> oldData = stringRedisTemplate.opsForHash().entries(redisKey);
214
+        boolean redisEmpty = (oldData == null || oldData.isEmpty());
215
+
216
+        Map<String, String> newData = new HashMap<>();
217
+        newData.put("path", path);
218
+        newData.put("timestamp", dateTime);
219
+        newData.put("name", name);
220
+        stringRedisTemplate.opsForHash().putAll(redisKey, newData);
221
+        stringRedisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
222
+
223
+        if (redisEmpty) {
224
+            Integer count = sysControllerService.selectSysControllerFaultCount(path);
225
+            if (count <= 0) {
226
+                sysControllerService.insertSysControllerFault(controllerId, dateTime, fleetId, name, path);
227
+            } else {
228
+                sysControllerService.updateSysControllerFault(controllerId, dateTime, fleetId, name, path);
229
+            }
230
+        } else {
231
+            if (!equals(oldData.get("path"), path)
232
+                    || !equals(oldData.get("name"), name)) {
233
+                sysControllerService.updateSysControllerFault(controllerId, dateTime, fleetId, name, path);
234
+            }
161 235
         }
162 236
     }
237
+
238
+    private static boolean equals(Object a, Object b) {
239
+        return (a == b) || (a != null && a.equals(b));
240
+    }
163 241
 }

+ 56
- 11
iot-platform/src/main/java/com/iot/platform/mqtt/MqttStatusConsumer.java Просмотреть файл

@@ -2,30 +2,43 @@ package com.iot.platform.mqtt;
2 2
 
3 3
 import com.alibaba.fastjson.JSONObject;
4 4
 import com.iot.platform.config.IotProperties;
5
+import com.iot.platform.service.SysStatusService;
5 6
 import org.apache.commons.lang3.StringUtils;
6 7
 import org.springframework.beans.factory.annotation.Qualifier;
7 8
 import org.springframework.data.redis.core.StringRedisTemplate;
8 9
 import org.springframework.stereotype.Component;
9 10
 
11
+import java.time.Instant;
12
+import java.time.ZoneId;
13
+import java.time.format.DateTimeFormatter;
14
+import java.util.HashMap;
15
+import java.util.Map;
10 16
 import java.util.concurrent.ExecutorService;
17
+import java.util.concurrent.TimeUnit;
11 18
 
12 19
 /**
13
- * MQTT 状态消息消费者,将控制器状态写入 Redis。
20
+ * MQTT 状态消息消费者,优先写入 Redis,数据变化时同步 MySQL
14 21
  */
15 22
 @Component
16 23
 public class MqttStatusConsumer extends AbstractMqttConsumer {
17 24
 
18 25
     private static final String KEY_CONTROLLER_ID = "controller_id";
19
-    private static final String KEY_FLEET_ID = "fleet_id";
20 26
     private static final String KEY_STATUS = "status";
21 27
 
28
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter
29
+            .ofPattern("yyyy-MM-dd HH:mm:ss")
30
+            .withZone(ZoneId.systemDefault());
31
+
22 32
     private final StringRedisTemplate stringRedisTemplate;
33
+    private final SysStatusService sysStatusService;
23 34
 
24 35
     public MqttStatusConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService,
25 36
                               IotProperties iotProperties,
26
-                              StringRedisTemplate stringRedisTemplate) {
37
+                              StringRedisTemplate stringRedisTemplate,
38
+                              SysStatusService sysStatusService) {
27 39
         super(executorService, iotProperties);
28 40
         this.stringRedisTemplate = stringRedisTemplate;
41
+        this.sysStatusService = sysStatusService;
29 42
     }
30 43
 
31 44
     @Override
@@ -55,18 +68,50 @@ public class MqttStatusConsumer extends AbstractMqttConsumer {
55 68
 
56 69
     private void processStatusPayload(JSONObject payload) {
57 70
         String controllerId = payload.getString(KEY_CONTROLLER_ID);
58
-        String fleetId = payload.getString(KEY_FLEET_ID);
59 71
         String status = payload.getString(KEY_STATUS);
60 72
 
61
-        if (StringUtils.isBlank(controllerId) || StringUtils.isBlank(fleetId)
62
-                || StringUtils.isBlank(status)) {
63
-            log.info("MQTT 状态消息缺少必填字段 | controllerId={} | fleetId={} | status={}",
64
-                    controllerId, fleetId, status);
73
+        if (StringUtils.isBlank(controllerId) || StringUtils.isBlank(status)) {
74
+            log.info("MQTT 状态消息缺少必填字段 | controllerId={} | status={}",
75
+                    controllerId, status);
65 76
             return;
66 77
         }
67 78
 
68
-        String redisKey = controllerId + "_" + fleetId;
69
-        stringRedisTemplate.opsForValue().set(redisKey, status);
70
-        log.debug("状态已写入 Redis | key={} | status={}", redisKey, status);
79
+        String dateTime = toDateTime();
80
+        String redisKey = "status_" + controllerId;
81
+
82
+        // 读取 Redis 旧数据用于比较
83
+        Map<Object, Object> oldData = stringRedisTemplate.opsForHash().entries(redisKey);
84
+        boolean redisEmpty = (oldData == null || oldData.isEmpty());
85
+
86
+        // 写入最新数据到 Redis
87
+        Map<String, String> newData = new HashMap<>();
88
+        newData.put("controllerId", controllerId);
89
+        newData.put("status", status);
90
+        newData.put("createTime", dateTime);
91
+        stringRedisTemplate.opsForHash().putAll(redisKey, newData);
92
+        stringRedisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
93
+
94
+        if (redisEmpty) {
95
+            // 首次写入:查一次 MySQL 判断新增还是更新
96
+            Integer count = sysStatusService.selectStatusCount(controllerId);
97
+            if (count <= 0) {
98
+                sysStatusService.insertSysStatus(controllerId, "", status, dateTime);
99
+            } else {
100
+                sysStatusService.updateStatus(controllerId, "", status, dateTime);
101
+            }
102
+        } else {
103
+            // Redis 有旧数据:比较关键字段(忽略 createTime),变化时直接更新 MySQL
104
+            if (!equals(oldData.get("status"), status)) {
105
+                sysStatusService.updateStatus(controllerId, "", status, dateTime);
106
+            }
107
+        }
108
+    }
109
+
110
+    private String toDateTime() {
111
+        return DATE_TIME_FORMATTER.format(Instant.now());
112
+    }
113
+
114
+    private static boolean equals(Object a, Object b) {
115
+        return (a == b) || (a != null && a.equals(b));
71 116
     }
72 117
 }

+ 2
- 2
iot-platform/src/main/java/com/iot/platform/service/SysAlarmService.java Просмотреть файл

@@ -16,11 +16,11 @@ public class SysAlarmService {
16 16
     @Resource
17 17
     public SysAlarmMapper sysAlarmMapper;
18 18
 
19
-    public void insertAlarm(String tableName,String faultId,String faultdescs,String faultstatus,String createtime,String messageType,String controllerId,String deviceId,String longitude,String latitude){
19
+    public void insertAlarm(String tableName,String faultId,String faultdescs,String faultstatus,String createtime,String messageType,String controllerId,String deviceId){
20 20
         if (tableName == null || !TABLE_NAME_PATTERN.matcher(tableName).matches()) {
21 21
             throw new IllegalArgumentException("非法表名: " + tableName);
22 22
         }
23
-        sysAlarmMapper.insertAlarm(tableName,faultId, faultdescs, faultstatus, createtime, messageType,controllerId,deviceId,longitude,latitude);
23
+        sysAlarmMapper.insertAlarm(tableName,faultId, faultdescs, faultstatus, createtime, messageType,controllerId,deviceId);
24 24
     }
25 25
 
26 26
 }

+ 12
- 2
iot-platform/src/main/java/com/iot/platform/service/SysControllerService.java Просмотреть файл

@@ -45,8 +45,18 @@ public class SysControllerService {
45 45
     }
46 46
 
47 47
     public void updateControllerAccept(String controllerId, String timestamp, String fleetId,
48
-                                       String name, String path, String deviceId, String updateTime) {
49
-        sysControllerMapper.updateControllerAccept(controllerId, timestamp, fleetId, name, path, deviceId, updateTime);
48
+                                       String name, String path, String deviceId) {
49
+        sysControllerMapper.updateControllerAccept(controllerId, timestamp, fleetId, name, path, deviceId);
50
+    }
51
+
52
+    public void updateSysControllerCmd(String controllerId, String timestamp, String fleetId,
53
+                                       String name, String path) {
54
+        sysControllerMapper.updateSysControllerCmd(controllerId, timestamp, fleetId, name, path);
55
+    }
56
+
57
+    public void updateSysControllerFault(String controllerId, String timestamp, String fleetId,
58
+                                         String name, String path) {
59
+        sysControllerMapper.updateSysControllerFault(controllerId, timestamp, fleetId, name, path);
50 60
     }
51 61
 
52 62
     public SysController selectControllerPath(String path) {

+ 8
- 10
iot-platform/src/main/java/com/iot/platform/service/SysFaultService.java Просмотреть файл

@@ -15,29 +15,27 @@ public class SysFaultService {
15 15
     @Resource
16 16
     public SysFaultMapper sysFaultMapper;
17 17
 
18
-    public void insertFault(String faultId,
18
+    public void insertAlertData(String faultId,
19 19
                             String faultdescs,
20 20
                             String faultstatus,
21 21
                             String createtime,
22 22
                             String messageType,
23 23
                             String controllerId,
24 24
                             String deviceId,
25
-                            String longitude,
26
-                            String latitude,
27
-                            String readpeople){
28
-        sysFaultMapper.insertFault(faultId,faultdescs,faultstatus,createtime,messageType,controllerId,deviceId,longitude,latitude,readpeople);
25
+                            String readpeople,
26
+                            String timestamp){
27
+        sysFaultMapper.insertAlertData(faultId,faultdescs,faultstatus,createtime,messageType,controllerId,deviceId,readpeople,timestamp);
29 28
     }
30 29
 
31
-    public void updateFault(
30
+    public void updateAlertData(
32 31
                      String faultstatus,
33 32
                      String messageType,
34
-                     String longitude,
35
-                     String latitude,
36 33
                      String faultdescs,
37 34
                      String controllerId,
38 35
                      String deviceId,
39
-                     String createtime){
40
-        sysFaultMapper.updateFault(faultstatus, messageType, longitude, latitude, faultdescs, controllerId, deviceId, createtime);
36
+                     String createtime,
37
+                     String timestamp){
38
+        sysFaultMapper.updateAlertData(faultstatus, messageType, faultdescs, controllerId, deviceId, createtime, timestamp);
41 39
     }
42 40
 
43 41
     public Integer selectFaultCount(@Param("fleetId")String fleetId){

+ 5
- 0
iot-platform/src/main/java/com/iot/platform/service/SysRealtimeService.java Просмотреть файл

@@ -19,6 +19,11 @@ public class SysRealtimeService {
19 19
     @Resource
20 20
     public SysRealtimeMapper sysRealtimeMapper;
21 21
 
22
+    public boolean tableExists(String tableName) {
23
+        validateTableName(tableName);
24
+        return sysRealtimeMapper.tableExists(tableName) > 0;
25
+    }
26
+
22 27
     public void createRealtime(String tableName) {
23 28
         validateTableName(tableName);
24 29
         log.info("准备创建表: [{}]", tableName);

+ 47
- 21
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Просмотреть файл

@@ -17,6 +17,10 @@ import org.springframework.stereotype.Component;
17 17
 import org.springframework.web.client.RestClientException;
18 18
 import org.springframework.web.client.RestTemplate;
19 19
 
20
+import java.time.Instant;
21
+import java.time.LocalDateTime;
22
+import java.time.ZoneId;
23
+import java.time.format.DateTimeFormatter;
20 24
 import java.util.*;
21 25
 import java.util.concurrent.TimeUnit;
22 26
 
@@ -27,7 +31,6 @@ public class VehicleSyncTask {
27 31
 
28 32
     private final StringRedisTemplate stringRedisTemplate;
29 33
     private final SysRealtimeService sysrealtimeService;
30
-    private final Set<String> createdTables = new HashSet<>();
31 34
 
32 35
     public VehicleSyncTask(StringRedisTemplate stringRedisTemplate,
33 36
                            SysRealtimeService sysrealtimeService) {
@@ -91,13 +94,11 @@ public class VehicleSyncTask {
91 94
                 }
92 95
                 String controllerId = remainder.substring(0, lastUnderscore);
93 96
                 String deviceId = remainder.substring(lastUnderscore + 1);
94
-                String dbPrefix = controllerId.length() >= 2 ? controllerId.substring(0, 2) : controllerId;
95
-                String tableName = "pe_iot_" + dbPrefix;
97
+                String tableName = "pe_iot";
96 98
 
97
-                if (!createdTables.contains(tableName)) {
99
+                if (!sysrealtimeService.tableExists(tableName)) {
98 100
                     try {
99 101
                         sysrealtimeService.createRealtime(tableName);
100
-                        createdTables.add(tableName);
101 102
                     } catch (Exception e) {
102 103
                         log.error("创建表失败: {} | {}", tableName, e.getMessage(), e);
103 104
                         continue;
@@ -111,23 +112,9 @@ public class VehicleSyncTask {
111 112
                     continue;
112 113
                 }
113 114
 
114
-                List<String> existingKeys = sysrealtimeService.selectAllKeys(tableName, controllerId);
115
-                Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
115
+                timestamp = formatTimestamp(timestamp);
116 116
 
117
-                for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
118
-                    String fieldKey = entry.getKey().toString();
119
-                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey) || "controller_id".equals(fieldKey)) {
120
-                        continue;
121
-                    }
122
-                    String fieldValue = getStringValue(dataMap, fieldKey);
123
-                    if (fieldValue == null) continue;
124
-
125
-                    if (existingKeySet.contains(fieldKey)) {
126
-                        sysrealtimeService.updateTables(tableName, createTime, fieldValue, timestamp, fieldKey, controllerId, deviceId);
127
-                    } else {
128
-                        sysrealtimeService.insertTables(tableName, createTime, controllerId, deviceId, timestamp, fieldKey, fieldValue);
129
-                    }
130
-                }
117
+                syncToTable(tableName, dataMap, createTime, timestamp, controllerId, deviceId);
131 118
             } catch (RedisConnectionFailureException e) {
132 119
                 log.error("Redis 连接失败: {} | {}", redisKey, e.getMessage());
133 120
             } catch (DataAccessException e) {
@@ -138,8 +125,47 @@ public class VehicleSyncTask {
138 125
         }
139 126
     }
140 127
 
128
+    private void syncToTable(String tableName, Map<Object, Object> dataMap,
129
+                               String createTime, String timestamp,
130
+                               String controllerId, String deviceId) {
131
+        List<String> existingKeys = sysrealtimeService.selectAllKeys(tableName, controllerId);
132
+        Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
133
+
134
+        for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
135
+            String fieldKey = entry.getKey().toString();
136
+            if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey)
137
+                    || "device_id".equals(fieldKey) || "controller_id".equals(fieldKey)) {
138
+                continue;
139
+            }
140
+            String fieldValue = getStringValue(dataMap, fieldKey);
141
+            if (fieldValue == null) continue;
142
+
143
+            if (existingKeySet.contains(fieldKey)) {
144
+                sysrealtimeService.updateTables(tableName, createTime, fieldValue, timestamp, fieldKey, controllerId, deviceId);
145
+            } else {
146
+                sysrealtimeService.insertTables(tableName, createTime, controllerId, deviceId, timestamp, fieldKey, fieldValue);
147
+            }
148
+        }
149
+    }
150
+
141 151
     private String getStringValue(Map<Object, Object> map, String key) {
142 152
         Object val = map.get(key);
143 153
         return val == null ? null : val.toString().trim();
144 154
     }
155
+
156
+    private static final DateTimeFormatter TS_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
157
+
158
+    private String formatTimestamp(String ts) {
159
+        try {
160
+            long tsLong = Long.parseLong(ts);
161
+            if (ts.length() >= 13) {
162
+                tsLong = tsLong / 1000;
163
+            }
164
+            return LocalDateTime.ofInstant(Instant.ofEpochSecond(tsLong), ZoneId.systemDefault())
165
+                    .format(TS_FORMATTER);
166
+        } catch (NumberFormatException e) {
167
+            log.warn("timestamp 格式无法解析,原样返回: {}", ts);
168
+            return ts;
169
+        }
170
+    }
145 171
 }

+ 2
- 2
iot-platform/src/main/resources/mapper/SysAlarmMapper.xml Просмотреть файл

@@ -14,7 +14,7 @@
14 14
     </resultMap>
15 15
 
16 16
     <insert id="insertAlarm">
17
-        insert into `${tableName}`(faultId,faultdescs,faultstatus,createtime,messageType,controller_id,device_id,longitude,latitude)
18
-        values(#{faultId},#{faultdescs},#{faultstatus},#{createtime},#{messageType},#{controllerId},#{deviceId},#{longitude},#{latitude})
17
+        insert into `${tableName}`(faultId,faultdescs,faultstatus,createtime,messageType,controller_id,device_id)
18
+        values(#{faultId},#{faultdescs},#{faultstatus},#{createtime},#{messageType},#{controllerId},#{deviceId})
19 19
     </insert>
20 20
 </mapper>

+ 19
- 2
iot-platform/src/main/resources/mapper/SysControllerMapper.xml Просмотреть файл

@@ -48,8 +48,25 @@
48 48
             timestamp=#{timestamp},
49 49
             fleet_id=#{fleetId},
50 50
             name=#{name},
51
-            device_id=#{deviceId},
52
-            update_time=#{updateTime}
51
+            device_id=#{deviceId}
52
+        where path = #{path}
53
+    </update>
54
+
55
+    <update id="updateSysControllerCmd">
56
+        update controller_issue
57
+        set controller_id=#{controllerId},
58
+            timestamp=#{timestamp},
59
+            fleet_id=#{fleetId},
60
+            name=#{name}
61
+        where path = #{path}
62
+    </update>
63
+
64
+    <update id="updateSysControllerFault">
65
+        update controller_fault
66
+        set controller_id=#{controllerId},
67
+            timestamp=#{timestamp},
68
+            fleet_id=#{fleetId},
69
+            name=#{name}
53 70
         where path = #{path}
54 71
     </update>
55 72
 

+ 6
- 7
iot-platform/src/main/resources/mapper/SysFaultMapper.xml Просмотреть файл

@@ -5,22 +5,21 @@
5 5
 <mapper namespace="com.iot.platform.mapper.SysFaultMapper">
6 6
     <!-- 通用查询映射结果 -->
7 7
     <resultMap id="BaseResultMap" type="com.iot.platform.domain.SysFault">
8
-        <result column="deviceId" property="device_id"/>
8
+        <result column="deviceId" property="deviceId"/>
9 9
         <result column="timestamp" property="timestamp"/>
10 10
         <result column="type" property="type"/>
11 11
         <result column="code" property="code"/>
12 12
         <result column="desc" property="desc"/>
13 13
     </resultMap>
14 14
 
15
-    <insert id="insertFault">
16
-        insert into alert_data(faultId,faultdescs,faultstatus,createtime,messageType,controller_id,device_id,longitude,latitude,readpeople)
17
-        VALUES(#{faultId},#{faultdescs},#{faultstatus},#{createtime},#{messageType},#{controllerId},#{deviceId},#{longitude},#{latitude},#{readpeople})
15
+    <insert id="insertAlertData">
16
+        insert into alert_data(faultId,faultdescs,faultstatus,createtime,messageType,controller_id,device_id,readpeople,timestamp)
17
+        VALUES(#{faultId},#{faultdescs},#{faultstatus},#{createtime},#{messageType},#{controllerId},#{deviceId},#{readpeople},#{timestamp})
18 18
     </insert>
19
-    <insert id="updateFault">
19
+    <insert id="updateAlertData">
20 20
         update alert_data set faultstatus=#{faultstatus},
21 21
                              messageType=#{messageType},
22
-                             longitude=#{longitude},
23
-                             latitude=#{latitude}
22
+                             timestamp=#{timestamp}
24 23
         where faultdescs=#{faultdescs}
25 24
           and controller_id=#{controllerId}
26 25
           and device_id=#{deviceId}

+ 8
- 0
iot-platform/src/main/resources/mapper/SysRealtimeMapper.xml Просмотреть файл

@@ -5,20 +5,28 @@
5 5
 <mapper namespace="com.iot.platform.mapper.SysRealtimeMapper">
6 6
     <!-- 通用查询映射结果 -->
7 7
     <resultMap id="BaseResultMap" type="com.iot.platform.domain.SysRealtime">
8
+        <id column="id" property="id"/>
8 9
         <result column="createTime" property="createTime"/>
9 10
         <result column="deviceId" property="deviceId"/>
10 11
         <result column="timestamp" property="timestamp"/>
11 12
         <result column="k" property="k"/>
12 13
         <result column="v" property="v"/>
13 14
     </resultMap>
15
+    <select id="tableExists" resultType="int">
16
+        SELECT COUNT(*) FROM information_schema.tables
17
+        WHERE table_schema = DATABASE() AND table_name = #{tableName}
18
+    </select>
19
+
14 20
     <update id="createRealtime">
15 21
         CREATE TABLE IF NOT EXISTS `${tableName}` (
22
+            id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
16 23
             create_time VARCHAR(255) NOT NULL COMMENT '时间戳',
17 24
             controller_id VARCHAR(255) NOT NULL COMMENT '控制器id',
18 25
             device_id VARCHAR(255) NOT NULL COMMENT '设备id',
19 26
             timestamp VARCHAR(255) NOT NULL COMMENT '时间戳',
20 27
             k VARCHAR(255) NOT NULL COMMENT 'key',
21 28
             v VARCHAR(255) NOT NULL COMMENT '值',
29
+            PRIMARY KEY (id),
22 30
             INDEX idx_controller_id (controller_id),
23 31
             INDEX idx_controller_device (controller_id, device_id),
24 32
             INDEX idx_controller_k (controller_id, k),

+ 5
- 1
iot-platform/src/test/java/com/iot/platform/mqtt/MqttStatusConsumerTest.java Просмотреть файл

@@ -2,6 +2,7 @@ package com.iot.platform.mqtt;
2 2
 
3 3
 import com.alibaba.fastjson.JSONObject;
4 4
 import com.iot.platform.config.IotProperties;
5
+import com.iot.platform.service.SysStatusService;
5 6
 import org.junit.jupiter.api.BeforeEach;
6 7
 import org.junit.jupiter.api.DisplayName;
7 8
 import org.junit.jupiter.api.Test;
@@ -33,13 +34,16 @@ class MqttStatusConsumerTest {
33 34
     @Mock
34 35
     private IotProperties iotProperties;
35 36
 
37
+    @Mock
38
+    private SysStatusService sysStatusService;
39
+
36 40
     private MqttStatusConsumer mqttStatusConsumer;
37 41
 
38 42
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
39 43
 
40 44
     @BeforeEach
41 45
     void setUp() {
42
-        mqttStatusConsumer = new MqttStatusConsumer(executorService, iotProperties, stringRedisTemplate);
46
+        mqttStatusConsumer = new MqttStatusConsumer(executorService, iotProperties, stringRedisTemplate, sysStatusService);
43 47
         lenient().when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations);
44 48
     }
45 49
 

Загрузка…
Отмена
Сохранить