Преглед на файлове

fix(P0): 5项安全与稳定性修复

- MqttGenericConsumer: destroy/init → refreshMqttSubscription() 避免破坏Spring生命周期
- MqttGenericConsumer: 添加 topics/cmdtopics/faultprot 空值保护防止NPE
- MqttGenericConsumer: 合并3处重复 persist 调用到方法末尾
- VehicleSyncTask: 修复 coordinateMap.get 的NPE风险(先判空再equals)
- VehicleSyncTask: 修复逗号分隔逻辑错误(i > size-1 → i < size-1)
- MqttDynamicConsumer: 删除废弃的 shutdownExecutor 死代码
mqy20260511
humanleft преди 4 дни
родител
ревизия
d0cff8b7f2

+ 0
- 17
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Целия файл

@@ -530,23 +530,6 @@ public class MqttDynamicConsumer {
530 530
         }
531 531
     }
532 532
 
533
-    private void shutdownExecutor(ExecutorService executor, String name) {
534
-        if (executor == null || executor.isShutdown()) return;
535
-        try {
536
-            executor.shutdown();
537
-            if (!executor.awaitTermination(20, TimeUnit.SECONDS)) {
538
-                List<Runnable> remaining = executor.shutdownNow();
539
-                log.error(name + " 线程池强制关闭,剩余任务数:" + remaining.size());
540
-            } else {
541
-                log.info(name + " 线程池已优雅关闭");
542
-            }
543
-        } catch (InterruptedException e) {
544
-            executor.shutdownNow();
545
-            Thread.currentThread().interrupt();
546
-            log.error(name + " 线程池关闭被中断");
547
-        }
548
-    }
549
-
550 533
     @PreDestroy
551 534
     public void destroy() {
552 535
         log.info(">>> 服务正在关闭...");

+ 15
- 11
iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java Целия файл

@@ -10,6 +10,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
10 10
 import org.springframework.stereotype.Component;
11 11
 
12 12
 import java.text.SimpleDateFormat;
13
+import java.util.Collections;
13 14
 import java.util.Date;
14 15
 import java.util.List;
15 16
 import java.util.concurrent.ExecutorService;
@@ -55,15 +56,17 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
55 56
         String timestamp = weather.getTimestamp();
56 57
         String fleetId = weather.getFleet_id();
57 58
         String controllerId = weather.getController_id();
59
+
58 60
         List<topics> topics = weather.getTopics();
61
+        if (topics == null) topics = Collections.emptyList();
59 62
         List<topics> cmdtopics = weather.getCmd_topics();
63
+        if (cmdtopics == null) cmdtopics = Collections.emptyList();
60 64
         topics faultprot = weather.getFault_prot();
61 65
 
62 66
         Integer controllercountcount = 0;
63 67
         for (topics topicsMap : topics) {
64 68
             Integer count = sysControllerService.selectsyscontrollercount(topicsMap.getPath());
65 69
             if (count <= 0) {
66
-                stringRedisTemplate.persist(controllerId);
67 70
                 stringRedisTemplate.opsForHash().put(controllerId + ":" + topicsMap.getName(), "path", topicsMap.getPath());
68 71
                 sysControllerService.insertsyscontroller(controllerId, timestamp, fleetId, topicsMap.getName(), topicsMap.getPath(), topicsMap.getPath().split("/")[1]);
69 72
                 controllercountcount++;
@@ -74,27 +77,28 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
74 77
             }
75 78
         }
76 79
 
77
-        Integer controllercountcmdcount = 0;
78 80
         for (topics cmdtopicsMap : cmdtopics) {
79 81
             Integer count = sysControllerService.selectsyscontrollercountcmd(cmdtopicsMap.getPath());
80 82
             if (count <= 0) {
81 83
                 stringRedisTemplate.opsForHash().put(controllerId + "_cmd:" + cmdtopicsMap.getName(), "path", cmdtopicsMap.getPath());
82
-                stringRedisTemplate.persist(controllerId);
83 84
                 sysControllerService.insertsyscontrollercmd(controllerId, timestamp, fleetId, cmdtopicsMap.getName(), cmdtopicsMap.getPath());
84
-                controllercountcmdcount++;
85 85
             }
86 86
         }
87 87
 
88
-        Integer count = sysControllerService.selectsyscontrollercountfault(faultprot.getPath());
89
-        if (count <= 0) {
90
-            stringRedisTemplate.opsForHash().put(controllerId + "_fault:" + faultprot.getName(), "path", faultprot.getPath());
91
-            sysControllerService.insertsyscontrollerfault(controllerId, timestamp, fleetId, faultprot.getName(), faultprot.getPath());
92
-            stringRedisTemplate.persist(controllerId);
88
+        if (faultprot != null) {
89
+            Integer count = sysControllerService.selectsyscontrollercountfault(faultprot.getPath());
90
+            if (count <= 0) {
91
+                stringRedisTemplate.opsForHash().put(controllerId + "_fault:" + faultprot.getName(), "path", faultprot.getPath());
92
+                sysControllerService.insertsyscontrollerfault(controllerId, timestamp, fleetId, faultprot.getName(), faultprot.getPath());
93
+            }
93 94
         }
94 95
 
96
+        // 统一 persist controllerId,确保其不会被过期删除
97
+        stringRedisTemplate.persist(controllerId);
98
+
95 99
         if (controllercountcount > 0) {
96
-            messageListenerService2.destroy();
97
-            messageListenerService2.initMqttConnection();
100
+            // TODO: 后续建议改为 Spring Event 解耦,避免直接调用其他 Bean 的方法
101
+            messageListenerService2.refreshMqttSubscription();
98 102
         }
99 103
     }
100 104
 }

+ 5
- 3
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Целия файл

@@ -94,7 +94,9 @@ public class VehicleSyncTask {
94 94
                 } else {
95 95
                     for (String key : keys) {
96 96
                         Map<Object, Object> coordinateMap = stringRedisTemplate.opsForHash().entries(key);
97
-                        if (coordinateMap.get("latitude").equals(latitude.getV()) && coordinateMap.get("longitude").equals(longitude.getV())) {
97
+                        Object cachedLat = coordinateMap.get("latitude");
98
+                        Object cachedLon = coordinateMap.get("longitude");
99
+                        if (cachedLat != null && cachedLat.equals(latitude.getV()) && cachedLon != null && cachedLon.equals(longitude.getV())) {
98 100
                             continue;
99 101
                         }
100 102
                         updateCarPosition(sysCar, latitude, longitude);
@@ -193,7 +195,7 @@ public class VehicleSyncTask {
193 195
                             String fieldvalue = entry.getValue().toString();
194 196
                             if (sysDeviceControlList.get(i).getControllerName().equals(fieldKey)) {
195 197
                                 keyvalue.append(fieldKey).append("=/'").append(fieldvalue).append("/'");
196
-                                if (i > sysDeviceControlList.size() - 1) {
198
+                                if (i < sysDeviceControlList.size() - 1) {
197 199
                                     keyvalue.append(",");
198 200
                                 }
199 201
                             }
@@ -210,7 +212,7 @@ public class VehicleSyncTask {
210 212
                             if (sysDeviceControlList.get(i).getControllerName().equals(fieldKey)) {
211 213
                                 key.append(fieldKey);
212 214
                                 value.append(fieldvalue);
213
-                                if (i > sysDeviceControlList.size() - 1) {
215
+                                if (i < sysDeviceControlList.size() - 1) {
214 216
                                     key.append(",");
215 217
                                     value.append(",");
216 218
                                 }

Loading…
Отказ
Запис