浏览代码

refactor: migrate MQTT Consumer thread pools to Spring-managed beans

- AbstractMqttConsumer: remove private executorService, use constructor injection
- MqttDynamicConsumer/MqttChargeStationConsumer: remove private core/write executors,
  inject via @Qualifier from ExecutorConfig
- ExecutorConfig: add mqttCoreExecutor, mqttWriteExecutor, abstractConsumerExecutor
  with destroyMethod="shutdown"
- VehicleSyncTask: refine scanKeys() exception handling, add per-record try-catch
  in doUpdateSysCar() loop
- Remove unused jjwt 0.9.1 (CVE-2019-17195) and springfox-boot-starter 3.0.0 deps
mqy20260511
humanleft 5 天前
父节点
当前提交
8c8fe4baed

+ 32
- 0
iot-platform/src/main/java/com/iot/platform/config/ExecutorConfig.java 查看文件

@@ -39,4 +39,36 @@ public class ExecutorConfig {
39 39
                 new ThreadPoolExecutor.CallerRunsPolicy()
40 40
         );
41 41
     }
42
+
43
+    @Bean(destroyMethod = "shutdown")
44
+    public ScheduledExecutorService mqttCoreExecutor() {
45
+        return Executors.newSingleThreadScheduledExecutor(r -> {
46
+            Thread t = new Thread(r, "mqtt-core-scheduler");
47
+            t.setDaemon(true);
48
+            return t;
49
+        });
50
+    }
51
+
52
+    @Bean(destroyMethod = "shutdown")
53
+    public ExecutorService mqttWriteExecutor() {
54
+        return new ThreadPoolExecutor(
55
+                4, 8, 60L, TimeUnit.SECONDS,
56
+                new LinkedBlockingQueue<>(5000),
57
+                r -> {
58
+                    Thread t = new Thread(r, "mqtt-write-" + UUID.randomUUID().toString().substring(0, 4));
59
+                    t.setDaemon(true);
60
+                    return t;
61
+                },
62
+                new ThreadPoolExecutor.CallerRunsPolicy()
63
+        );
64
+    }
65
+
66
+    @Bean(destroyMethod = "shutdown")
67
+    public ExecutorService abstractConsumerExecutor() {
68
+        return Executors.newSingleThreadExecutor(r -> {
69
+            Thread t = new Thread(r, "mqtt-abstract-consumer");
70
+            t.setDaemon(true);
71
+            return t;
72
+        });
73
+    }
42 74
 }

+ 6
- 3
iot-platform/src/main/java/com/iot/platform/mqtt/AbstractMqttConsumer.java 查看文件

@@ -12,7 +12,6 @@ import javax.annotation.PreDestroy;
12 12
 import java.net.InetSocketAddress;
13 13
 import java.net.Socket;
14 14
 import java.util.concurrent.ExecutorService;
15
-import java.util.concurrent.Executors;
16 15
 
17 16
 public abstract class AbstractMqttConsumer {
18 17
 
@@ -21,6 +20,8 @@ public abstract class AbstractMqttConsumer {
21 20
     @Autowired
22 21
     protected IotProperties iotProperties;
23 22
 
23
+    protected final ExecutorService executorService;
24
+
24 25
     private String brokerUrl;
25 26
     private String brokerHost;
26 27
     private int brokerPort;
@@ -35,7 +36,10 @@ public abstract class AbstractMqttConsumer {
35 36
     protected MqttClient mqttClient;
36 37
     protected MqttConnectOptions connOpts;
37 38
     protected volatile boolean isMqttConnected = false;
38
-    protected final ExecutorService executorService = Executors.newSingleThreadExecutor();
39
+
40
+    protected AbstractMqttConsumer(ExecutorService executorService) {
41
+        this.executorService = executorService;
42
+    }
39 43
 
40 44
     protected abstract String getSubscribeTopic();
41 45
 
@@ -168,7 +172,6 @@ public abstract class AbstractMqttConsumer {
168 172
                 mqttClient.close();
169 173
                 log.info("MQTT连接已断开");
170 174
             }
171
-            executorService.shutdown();
172 175
             onDestroy();
173 176
         } catch (MqttException e) {
174 177
             log.error("MQTT断开连接失败:", e);

+ 10
- 41
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java 查看文件

@@ -7,6 +7,7 @@ import org.eclipse.paho.client.mqttv3.*;
7 7
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
8 8
 import com.iot.platform.config.IotProperties;
9 9
 import org.springframework.beans.factory.annotation.Autowired;
10
+import org.springframework.beans.factory.annotation.Qualifier;
10 11
 import org.springframework.context.annotation.DependsOn;
11 12
 import org.springframework.data.redis.core.StringRedisTemplate;
12 13
 import org.springframework.stereotype.Component;
@@ -56,27 +57,15 @@ public class MqttChargeStationConsumer {
56 57
     private final AtomicBoolean isConnected = new AtomicBoolean(false);
57 58
     private final Object lock = new Object();
58 59
 
59
-    // 核心调度线程池
60
-    private final ScheduledExecutorService coreExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
61
-        Thread t = new Thread(r);
62
-        t.setName("mqtt-core-scheduler");
63
-        t.setDaemon(false); // ← 改这里!
64
-        t.setPriority(Thread.NORM_PRIORITY - 1);
65
-        return t;
66
-    });
67
-
68
-    // TDengine 写入线程池
69
-    private final ExecutorService writeExecutor = new ThreadPoolExecutor(
70
-            4, 8, 60L, TimeUnit.SECONDS,
71
-            new LinkedBlockingQueue<>(5000),
72
-            r -> {
73
-                Thread t = new Thread(r);
74
-                t.setName("td-write-worker-" + UUID.randomUUID().toString().substring(0, 4));
75
-                t.setDaemon(false); // ← 改这里!
76
-                return t;
77
-            },
78
-            new ThreadPoolExecutor.CallerRunsPolicy()
79
-    );
60
+    private final ScheduledExecutorService coreExecutor;
61
+    private final ExecutorService writeExecutor;
62
+
63
+    @Autowired
64
+    public MqttChargeStationConsumer(@Qualifier("mqttCoreExecutor") ScheduledExecutorService coreExecutor,
65
+                                     @Qualifier("mqttWriteExecutor") ExecutorService writeExecutor) {
66
+        this.coreExecutor = coreExecutor;
67
+        this.writeExecutor = writeExecutor;
68
+    }
80 69
 
81 70
     @PostConstruct
82 71
     @DependsOn({"sysControllerService", "tdengineService"})
@@ -459,8 +448,6 @@ public class MqttChargeStationConsumer {
459 448
 
460 449
     public void disconnect() {
461 450
         synchronized (lock) {
462
-            shutdownExecutor(writeExecutor, "TDengine 写入");
463
-
464 451
             if (mqttClient != null) {
465 452
                 try {
466 453
                     if (mqttClient.isConnected()) {
@@ -480,27 +467,9 @@ public class MqttChargeStationConsumer {
480 467
         }
481 468
     }
482 469
 
483
-    private void shutdownExecutor(ExecutorService executor, String name) {
484
-        if (executor == null || executor.isShutdown()) return;
485
-        try {
486
-            executor.shutdown();
487
-            if (!executor.awaitTermination(20, TimeUnit.SECONDS)) {
488
-                List<Runnable> remaining = executor.shutdownNow();
489
-                log.error(name + " 线程池强制关闭,剩余任务数:" + remaining.size());
490
-            } else {
491
-                log.info(name + " 线程池已优雅关闭");
492
-            }
493
-        } catch (InterruptedException e) {
494
-            executor.shutdownNow();
495
-            Thread.currentThread().interrupt();
496
-            log.error(name + " 线程池关闭被中断");
497
-        }
498
-    }
499
-
500 470
     @PreDestroy
501 471
     public void destroy() {
502 472
         log.info(">>> 服务正在关闭...");
503 473
         disconnect();
504
-        shutdownExecutor(coreExecutor, "MQTT 核心");
505 474
     }
506 475
 }

+ 10
- 24
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java 查看文件

@@ -9,6 +9,7 @@ import org.eclipse.paho.client.mqttv3.*;
9 9
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
10 10
 import com.iot.platform.config.IotProperties;
11 11
 import org.springframework.beans.factory.annotation.Autowired;
12
+import org.springframework.beans.factory.annotation.Qualifier;
12 13
 import org.springframework.context.annotation.DependsOn;
13 14
 import org.springframework.data.redis.core.StringRedisTemplate;
14 15
 import org.springframework.stereotype.Component;
@@ -64,27 +65,15 @@ public class MqttDynamicConsumer {
64 65
     private final AtomicBoolean isConnected = new AtomicBoolean(false);
65 66
     private final Object lock = new Object();
66 67
 
67
-    // 核心调度线程池
68
-    private final ScheduledExecutorService coreExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
69
-        Thread t = new Thread(r);
70
-        t.setName("mqtt-core-scheduler");
71
-        t.setDaemon(false); // ← 改这里!
72
-        t.setPriority(Thread.NORM_PRIORITY - 1);
73
-        return t;
74
-    });
75
-
76
-    // TDengine 写入线程池
77
-    private final ExecutorService writeExecutor = new ThreadPoolExecutor(
78
-            4, 8, 60L, TimeUnit.SECONDS,
79
-            new LinkedBlockingQueue<>(5000),
80
-            r -> {
81
-                Thread t = new Thread(r);
82
-                t.setName("td-write-worker-" + UUID.randomUUID().toString().substring(0, 4));
83
-                t.setDaemon(false); // ← 改这里!
84
-                return t;
85
-            },
86
-            new ThreadPoolExecutor.CallerRunsPolicy()
87
-    );
68
+    private final ScheduledExecutorService coreExecutor;
69
+    private final ExecutorService writeExecutor;
70
+
71
+    @Autowired
72
+    public MqttDynamicConsumer(@Qualifier("mqttCoreExecutor") ScheduledExecutorService coreExecutor,
73
+                               @Qualifier("mqttWriteExecutor") ExecutorService writeExecutor) {
74
+        this.coreExecutor = coreExecutor;
75
+        this.writeExecutor = writeExecutor;
76
+    }
88 77
 
89 78
     @PostConstruct
90 79
     @DependsOn({"sysControllerService", "tdengineService"})
@@ -522,8 +511,6 @@ public class MqttDynamicConsumer {
522 511
 
523 512
     public void disconnect() {
524 513
         synchronized (lock) {
525
-            shutdownExecutor(writeExecutor, "TDengine 写入");
526
-
527 514
             if (mqttClient != null) {
528 515
                 try {
529 516
                     if (mqttClient.isConnected()) {
@@ -564,6 +551,5 @@ public class MqttDynamicConsumer {
564 551
     public void destroy() {
565 552
         log.info(">>> 服务正在关闭...");
566 553
         disconnect();
567
-        shutdownExecutor(coreExecutor, "MQTT 核心");
568 554
     }
569 555
 }

+ 4
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttFaultConsumer.java 查看文件

@@ -6,6 +6,7 @@ import com.iot.platform.domain.SysFault;
6 6
 import com.iot.platform.service.*;
7 7
 import com.iot.platform.common.utils.NumericIdGenerator;
8 8
 import org.springframework.beans.factory.annotation.Autowired;
9
+import org.springframework.beans.factory.annotation.Qualifier;
9 10
 import org.springframework.stereotype.Component;
10 11
 import org.springframework.web.client.RestTemplate;
11 12
 
@@ -43,7 +44,9 @@ public class MqttFaultConsumer extends AbstractMqttConsumer {
43 44
     private final ExecutorService mqttFaultExecutor;
44 45
 
45 46
     @Autowired
46
-    public MqttFaultConsumer(ExecutorService mqttFaultExecutor) {
47
+    public MqttFaultConsumer(@Qualifier("mqttFaultExecutor") ExecutorService mqttFaultExecutor,
48
+                             @Qualifier("abstractConsumerExecutor") ExecutorService executorService) {
49
+        super(executorService);
47 50
         this.mqttFaultExecutor = mqttFaultExecutor;
48 51
     }
49 52
 

+ 7
- 0
iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java 查看文件

@@ -5,10 +5,12 @@ import com.iot.platform.domain.ControllerData;
5 5
 import com.iot.platform.domain.topics;
6 6
 import com.iot.platform.service.SysControllerService;
7 7
 import org.springframework.beans.factory.annotation.Autowired;
8
+import org.springframework.beans.factory.annotation.Qualifier;
8 9
 import org.springframework.data.redis.core.StringRedisTemplate;
9 10
 import org.springframework.stereotype.Component;
10 11
 
11 12
 import java.util.List;
13
+import java.util.concurrent.ExecutorService;
12 14
 
13 15
 /**
14 16
  * 存储控制器数据
@@ -25,6 +27,11 @@ public class MqttGenericConsumer extends AbstractMqttConsumer {
25 27
 
26 28
     private final ObjectMapper objectMapper = new ObjectMapper();
27 29
 
30
+    @Autowired
31
+    public MqttGenericConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService) {
32
+        super(executorService);
33
+    }
34
+
28 35
     @Override
29 36
     protected String getSubscribeTopic() {
30 37
         return "+/generics";

+ 7
- 0
iot-platform/src/main/java/com/iot/platform/mqtt/MqttStatusConsumer.java 查看文件

@@ -4,12 +4,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
4 4
 import com.iot.platform.service.SysControllerService;
5 5
 import com.iot.platform.service.SysStatusService;
6 6
 import org.springframework.beans.factory.annotation.Autowired;
7
+import org.springframework.beans.factory.annotation.Qualifier;
7 8
 import org.springframework.data.redis.core.StringRedisTemplate;
8 9
 import org.springframework.stereotype.Component;
9 10
 
10 11
 import java.time.LocalDateTime;
11 12
 import java.time.format.DateTimeFormatter;
12 13
 import java.util.Map;
14
+import java.util.concurrent.ExecutorService;
13 15
 
14 16
 /**
15 17
  * 存储控制器状态数据
@@ -26,6 +28,11 @@ public class MqttStatusConsumer extends AbstractMqttConsumer {
26 28
 
27 29
     private final ObjectMapper objectMapper = new ObjectMapper();
28 30
 
31
+    @Autowired
32
+    public MqttStatusConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService) {
33
+        super(executorService);
34
+    }
35
+
29 36
     @Override
30 37
     protected String getSubscribeTopic() {
31 38
         return "+/status";

+ 24
- 16
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java 查看文件

@@ -79,25 +79,31 @@ public class VehicleSyncTask {
79 79
     private void doUpdateSysCar() {
80 80
         List<SysCar> sysCarList = sysCarService.selectcontrollerId();
81 81
         for (SysCar sysCar : sysCarList) {
82
-            if (sysCar.getControllerId() == null || sysCar.getControllerId().isEmpty()) {
83
-                continue;
84
-            }
85
-            SysDevice latitude = sysDeviceService.selectsysdevice(sysCar.getControllerId(), "纬度");
86
-            SysDevice longitude = sysDeviceService.selectsysdevice(sysCar.getControllerId(), "经度");
82
+            try {
83
+                if (sysCar.getControllerId() == null || sysCar.getControllerId().isEmpty()) {
84
+                    continue;
85
+                }
86
+                SysDevice latitude = sysDeviceService.selectsysdevice(sysCar.getControllerId(), "纬度");
87
+                SysDevice longitude = sysDeviceService.selectsysdevice(sysCar.getControllerId(), "经度");
87 88
 
88
-            String redisKeyPattern = "workorder:coordinate:" + sysCar.getControllerId() + ":*";
89
-            Set<String> keys = scanKeys(redisKeyPattern);
89
+                String redisKeyPattern = "workorder:coordinate:" + sysCar.getControllerId() + ":*";
90
+                Set<String> keys = scanKeys(redisKeyPattern);
90 91
 
91
-            if (keys == null || keys.isEmpty()) {
92
-                updateCarPosition(sysCar, latitude, longitude);
93
-            } else {
94
-                for (String key : keys) {
95
-                    Map<Object, Object> coordinateMap = stringRedisTemplate.opsForHash().entries(key);
96
-                    if (coordinateMap.get("latitude").equals(latitude.getV()) && coordinateMap.get("longitude").equals(longitude.getV())) {
97
-                        continue;
98
-                    }
92
+                if (keys == null || keys.isEmpty()) {
99 93
                     updateCarPosition(sysCar, latitude, longitude);
94
+                } else {
95
+                    for (String key : keys) {
96
+                        Map<Object, Object> coordinateMap = stringRedisTemplate.opsForHash().entries(key);
97
+                        if (coordinateMap.get("latitude").equals(latitude.getV()) && coordinateMap.get("longitude").equals(longitude.getV())) {
98
+                            continue;
99
+                        }
100
+                        updateCarPosition(sysCar, latitude, longitude);
101
+                    }
100 102
                 }
103
+            } catch (DataAccessException e) {
104
+                log.error("更新车辆位置失败 carId={}: {}", sysCar.getCarId(), e.getMessage(), e);
105
+            } catch (Exception e) {
106
+                log.error("更新车辆位置异常 carId={}: {}", sysCar.getCarId(), e.getMessage(), e);
101 107
             }
102 108
         }
103 109
     }
@@ -130,8 +136,10 @@ public class VehicleSyncTask {
130 136
                 cursor.close();
131 137
                 return null;
132 138
             });
139
+        } catch (RedisConnectionFailureException e) {
140
+            log.warn("Redis SCAN 连接失败 pattern={}: {}", pattern, e.getMessage());
133 141
         } catch (Exception e) {
134
-            log.error("Redis SCAN失败 pattern={}: {}", pattern, e.getMessage());
142
+            log.error("Redis SCAN 失败 pattern={}: {}", pattern, e.getMessage(), e);
135 143
         }
136 144
         return keys;
137 145
     }

+ 7
- 2
iot-platform/src/test/java/com/iot/platform/mqtt/MqttGenericConsumerTest.java 查看文件

@@ -3,21 +3,26 @@ package com.iot.platform.mqtt;
3 3
 import org.junit.jupiter.api.DisplayName;
4 4
 import org.junit.jupiter.api.Test;
5 5
 
6
+import java.util.concurrent.ExecutorService;
7
+import java.util.concurrent.Executors;
8
+
6 9
 import static org.assertj.core.api.Assertions.assertThat;
7 10
 
8 11
 class MqttGenericConsumerTest {
9 12
 
13
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
14
+
10 15
     @Test
11 16
     @DisplayName("getSubscribeTopic 应返回 +/generics")
12 17
     void getSubscribeTopic_returnsCorrectValue() {
13
-        MqttGenericConsumer consumer = new MqttGenericConsumer();
18
+        MqttGenericConsumer consumer = new MqttGenericConsumer(executorService);
14 19
         assertThat(consumer.getSubscribeTopic()).isEqualTo("+/generics");
15 20
     }
16 21
 
17 22
     @Test
18 23
     @DisplayName("generateClientId 应包含 mqttx 前缀")
19 24
     void generateClientId_containsPrefix() {
20
-        MqttGenericConsumer consumer = new MqttGenericConsumer();
25
+        MqttGenericConsumer consumer = new MqttGenericConsumer(executorService);
21 26
         String clientId = consumer.generateClientId();
22 27
         assertThat(clientId).startsWith("mqttx_e216fbf16");
23 28
     }

+ 0
- 23
pom.xml 查看文件

@@ -20,11 +20,9 @@
20 20
         <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
21 21
         <spring-boot.version>2.5.15</spring-boot.version>
22 22
         <druid.version>1.2.23</druid.version>
23
-        <swagger.version>3.0.0</swagger.version>
24 23
         <pagehelper.boot.version>1.4.7</pagehelper.boot.version>
25 24
         <fastjson.version>2.0.57</fastjson.version>
26 25
         <commons.io.version>2.19.0</commons.io.version>
27
-        <jwt.version>0.9.1</jwt.version>
28 26
         <!-- override dependency version -->
29 27
         <tomcat.version>9.0.106</tomcat.version>
30 28
         <logback.version>1.2.13</logback.version>
@@ -109,19 +107,6 @@
109 107
                 <version>${pagehelper.boot.version}</version>
110 108
             </dependency>
111 109
 
112
-            <!-- Swagger3依赖 -->
113
-            <dependency>
114
-                <groupId>io.springfox</groupId>
115
-                <artifactId>springfox-boot-starter</artifactId>
116
-                <version>${swagger.version}</version>
117
-                <exclusions>
118
-                    <exclusion>
119
-                        <groupId>io.swagger</groupId>
120
-                        <artifactId>swagger-models</artifactId>
121
-                    </exclusion>
122
-                </exclusions>
123
-            </dependency>
124
-
125 110
             <!-- io常用工具类 -->
126 111
             <dependency>
127 112
                 <groupId>commons-io</groupId>
@@ -129,14 +114,6 @@
129 114
                 <version>${commons.io.version}</version>
130 115
             </dependency>
131 116
 
132
-            <!-- Token生成与解析-->
133
-            <dependency>
134
-                <groupId>io.jsonwebtoken</groupId>
135
-                <artifactId>jjwt</artifactId>
136
-                <version>${jwt.version}</version>
137
-            </dependency>
138
-
139
-
140 117
         </dependencies>
141 118
     </dependencyManagement>
142 119
 

正在加载...
取消
保存