Преглед изворни кода

refactor(P0): 提取 AbstractDynamicMqttConsumer 基类消除400+行重复

- 新建 AbstractDynamicMqttConsumer 抽象基类,抽取连接/订阅/重连公共逻辑
- MqttDynamicConsumer extends 基类,仅保留 fetchTopics + processMessage
- MqttChargeStationConsumer extends 基类,仅保留 fetchTopics + processMessage
- 配置化 ChargeStation topic(IotProperties + application.yml)
- 修复 @DependsOn 位置(方法→类级别)
- 从 disconnect() 中移除 tdengineService.close()(Spring单例不应被消费者关闭)
- 基类日志统一使用 {} 占位符,移除 emoji
mqy20260511
humanleft пре 4 дана
родитељ
комит
66e22bd4a7

+ 9
- 0
iot-platform/src/main/java/com/iot/platform/config/IotProperties.java Прегледај датотеку

@@ -56,6 +56,7 @@ public class IotProperties {
56 56
         private String brokerUrl = "tcp://47.104.204.180:1883";
57 57
         private String username = "";
58 58
         private String password = "";
59
+        private String chargeStationTopic = "station/ChargeStation/device/+/post/json";
59 60
 
60 61
         public String getBrokerUrl() {
61 62
             return brokerUrl;
@@ -80,6 +81,14 @@ public class IotProperties {
80 81
         public void setPassword(String password) {
81 82
             this.password = password;
82 83
         }
84
+
85
+        public String getChargeStationTopic() {
86
+            return chargeStationTopic;
87
+        }
88
+
89
+        public void setChargeStationTopic(String chargeStationTopic) {
90
+            this.chargeStationTopic = chargeStationTopic;
91
+        }
83 92
     }
84 93
 
85 94
     /**

+ 442
- 0
iot-platform/src/main/java/com/iot/platform/mqtt/AbstractDynamicMqttConsumer.java Прегледај датотеку

@@ -0,0 +1,442 @@
1
+package com.iot.platform.mqtt;
2
+
3
+import com.fasterxml.jackson.core.type.TypeReference;
4
+import com.fasterxml.jackson.databind.ObjectMapper;
5
+import com.iot.platform.config.IotProperties;
6
+import org.eclipse.paho.client.mqttv3.*;
7
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
8
+import org.slf4j.Logger;
9
+import org.slf4j.LoggerFactory;
10
+import org.springframework.beans.factory.annotation.Autowired;
11
+
12
+import javax.annotation.PostConstruct;
13
+import javax.annotation.PreDestroy;
14
+import java.net.InetSocketAddress;
15
+import java.net.Socket;
16
+import java.util.*;
17
+import java.util.concurrent.*;
18
+import java.util.concurrent.atomic.AtomicBoolean;
19
+
20
+/**
21
+ * 动态 Topic MQTT 消费者基类。
22
+ * 抽取 MqttDynamicConsumer 与 MqttChargeStationConsumer 的公共连接/订阅管理逻辑,
23
+ * 子类只需实现 {@link #fetchTopics()} 和 {@link #processMessage(String, String)}。
24
+ */
25
+public abstract class AbstractDynamicMqttConsumer {
26
+
27
+    protected final Logger log = LoggerFactory.getLogger(getClass());
28
+
29
+    protected final IotProperties iotProperties;
30
+    protected final ScheduledExecutorService coreExecutor;
31
+    protected final ExecutorService writeExecutor;
32
+
33
+    // MQTT 配置
34
+    protected String brokerUrl;
35
+    protected String brokerHost;
36
+    protected int brokerPort;
37
+    protected String mqttUsername;
38
+    protected String mqttPassword;
39
+
40
+    private static final int QOS = 1;
41
+    private static final int CONNECT_TIMEOUT = 3000;
42
+    private static final int RECONNECT_INTERVAL = 5000;
43
+    private static final int MAX_BATCH_SIZE = 50;
44
+
45
+    protected MqttClient mqttClient;
46
+    protected MqttConnectOptions connOpts;
47
+    protected final Set<String> currentTopicSet = new CopyOnWriteArraySet<>();
48
+    protected final AtomicBoolean isConnected = new AtomicBoolean(false);
49
+    protected final Object lock = new Object();
50
+
51
+    protected AbstractDynamicMqttConsumer(IotProperties iotProperties,
52
+                                          ScheduledExecutorService coreExecutor,
53
+                                          ExecutorService writeExecutor) {
54
+        this.iotProperties = iotProperties;
55
+        this.coreExecutor = coreExecutor;
56
+        this.writeExecutor = writeExecutor;
57
+    }
58
+
59
+    /**
60
+     * 子类实现:获取当前需要订阅的 Topic 列表。
61
+     */
62
+    protected abstract List<String> fetchTopics();
63
+
64
+    /**
65
+     * 子类实现:处理收到的 MQTT 消息内容。
66
+     *
67
+     * @param content 消息原始字符串(JSON)
68
+     * @param topic   消息所属 Topic
69
+     */
70
+    protected abstract void processMessage(String content, String topic) throws Exception;
71
+
72
+    @PostConstruct
73
+    public void initMqttConnection() {
74
+        this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
75
+        String brokerAddr = this.brokerUrl.replace("tcp://", "");
76
+        int colonIdx = brokerAddr.lastIndexOf(':');
77
+        this.brokerHost = brokerAddr.substring(0, colonIdx);
78
+        this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
79
+        this.mqttUsername = iotProperties.getMqtt().getUsername();
80
+        this.mqttPassword = iotProperties.getMqtt().getPassword();
81
+
82
+        log.info("开始初始化 MQTT 动态订阅服务...");
83
+
84
+        CompletableFuture<Boolean> connectFuture = CompletableFuture.supplyAsync(() -> {
85
+            int initRetry = 0;
86
+            while (initRetry < 3 && !isConnected.get()) {
87
+                try {
88
+                    if (refreshMqttSubscription()) {
89
+                        log.info("MQTT 动态订阅初始化成功");
90
+                        return true;
91
+                    }
92
+                } catch (Exception e) {
93
+                    log.error("MQTT 启动初始化失败(第{}次)", initRetry + 1, e);
94
+                }
95
+                initRetry++;
96
+                try {
97
+                    Thread.sleep(RECONNECT_INTERVAL * initRetry);
98
+                } catch (InterruptedException ex) {
99
+                    Thread.currentThread().interrupt();
100
+                    break;
101
+                }
102
+            }
103
+            return false;
104
+        }, coreExecutor);
105
+
106
+        try {
107
+            Boolean connected = connectFuture.get(10, TimeUnit.SECONDS);
108
+            if (!connected) {
109
+                log.error("MQTT 启动失败(超时),触发后台重连机制");
110
+                triggerReconnect();
111
+            }
112
+        } catch (TimeoutException e) {
113
+            log.error("MQTT 初始化超时(10秒),触发后台重连机制");
114
+            triggerReconnect();
115
+        } catch (InterruptedException e) {
116
+            Thread.currentThread().interrupt();
117
+            log.error("MQTT 初始化被中断");
118
+            triggerReconnect();
119
+        } catch (ExecutionException e) {
120
+            log.error("MQTT 初始化执行异常", e.getCause());
121
+            triggerReconnect();
122
+        }
123
+    }
124
+
125
+    private void initMqttConnectOptions() {
126
+        if (connOpts != null) return;
127
+        connOpts = new MqttConnectOptions();
128
+        connOpts.setCleanSession(false);
129
+        connOpts.setAutomaticReconnect(true);
130
+        connOpts.setConnectionTimeout(10);
131
+        connOpts.setKeepAliveInterval(60);
132
+        connOpts.setMaxInflight(10);
133
+        connOpts.setMaxReconnectDelay(30);
134
+        connOpts.setUserName(mqttUsername);
135
+        connOpts.setPassword(mqttPassword.toCharArray());
136
+    }
137
+
138
+    public boolean refreshMqttSubscription() {
139
+        synchronized (lock) {
140
+            try {
141
+                initMqttConnectOptions();
142
+                List<String> latestTopicList = fetchTopics();
143
+                log.info("查询到 Topic 列表: {}", latestTopicList);
144
+
145
+                if (latestTopicList == null || latestTopicList.isEmpty()) {
146
+                    log.error("未查询到 Topic,取消所有订阅");
147
+                    unsubscribeAll();
148
+                    currentTopicSet.clear();
149
+                    return false;
150
+                }
151
+
152
+                Set<String> latestTopicSet = new HashSet<>();
153
+                for (String topic : latestTopicList) {
154
+                    if (topic != null && !topic.trim().isEmpty()) {
155
+                        latestTopicSet.add(topic.trim());
156
+                    }
157
+                }
158
+
159
+                if (latestTopicSet.equals(currentTopicSet)) {
160
+                    log.info("Topic 列表无变化,无需刷新订阅关系");
161
+                    if (!isConnected.get() && !latestTopicSet.isEmpty()) {
162
+                        return connectAndSubscribe(latestTopicSet);
163
+                    }
164
+                    return true;
165
+                }
166
+
167
+                Set<String> topicsToAdd = new HashSet<>(latestTopicSet);
168
+                topicsToAdd.removeAll(currentTopicSet);
169
+                Set<String> topicsToRemove = new HashSet<>(currentTopicSet);
170
+                topicsToRemove.removeAll(latestTopicSet);
171
+
172
+                if (!isConnected.get()) {
173
+                    return connectAndSubscribe(latestTopicSet);
174
+                } else {
175
+                    if (!topicsToRemove.isEmpty()) unsubscribeTopics(topicsToRemove);
176
+                    if (!topicsToAdd.isEmpty()) subscribeTopics(topicsToAdd);
177
+                }
178
+
179
+                currentTopicSet.clear();
180
+                currentTopicSet.addAll(latestTopicSet);
181
+                log.info("MQTT 订阅刷新完成,当前数量:{}", currentTopicSet.size());
182
+                return true;
183
+
184
+            } catch (Exception e) {
185
+                log.error("MQTT 订阅刷新失败", e);
186
+                return false;
187
+            }
188
+        }
189
+    }
190
+
191
+    private boolean connectAndSubscribe(Set<String> topicSet) {
192
+        synchronized (lock) {
193
+            if (isConnected.get() && mqttClient != null && mqttClient.isConnected()) {
194
+                return true;
195
+            }
196
+            try {
197
+                if (!checkServerAvailability()) {
198
+                    log.error("Broker 不可达,连接失败");
199
+                    return false;
200
+                }
201
+
202
+                if (mqttClient == null) {
203
+                    String clientId = generateUniqueClientId();
204
+                    MemoryPersistence persistence = new MemoryPersistence();
205
+                    mqttClient = new MqttClient(brokerUrl, clientId, persistence);
206
+                    setMqttCallback();
207
+                }
208
+
209
+                int connectRetry = 0;
210
+                while (connectRetry < 3 && !mqttClient.isConnected()) {
211
+                    try {
212
+                        mqttClient.connect(connOpts);
213
+                        if (mqttClient.isConnected()) {
214
+                            isConnected.set(true);
215
+                            log.info("MQTT 连接成功,客户端 ID:{}", mqttClient.getClientId());
216
+                            break;
217
+                        }
218
+                    } catch (MqttException e) {
219
+                        log.error("MQTT 连接失败(第{}次):{}", connectRetry + 1, e.getMessage());
220
+                        connectRetry++;
221
+                        if (connectRetry < 3) Thread.sleep(RECONNECT_INTERVAL * (connectRetry + 1));
222
+                    }
223
+                }
224
+
225
+                if (!mqttClient.isConnected()) {
226
+                    isConnected.set(false);
227
+                    return false;
228
+                }
229
+
230
+                if (!topicSet.isEmpty()) {
231
+                    List<String> topicList = new ArrayList<>(topicSet);
232
+                    for (int i = 0; i < topicList.size(); i += MAX_BATCH_SIZE) {
233
+                        int end = Math.min(i + MAX_BATCH_SIZE, topicList.size());
234
+                        List<String> batch = topicList.subList(i, end);
235
+                        batchSubscribeTopics(new HashSet<>(batch));
236
+                        Thread.sleep(100);
237
+                    }
238
+                }
239
+                return true;
240
+
241
+            } catch (MqttException e) {
242
+                log.error("MQTT 连接 + 订阅失败", e);
243
+                isConnected.set(false);
244
+                triggerReconnect();
245
+                return false;
246
+            } catch (InterruptedException e) {
247
+                Thread.currentThread().interrupt();
248
+                log.error("MQTT 连接 + 订阅被中断");
249
+                isConnected.set(false);
250
+                return false;
251
+            }
252
+        }
253
+    }
254
+
255
+    private void batchSubscribeTopics(Set<String> topicSet) {
256
+        if (topicSet.isEmpty() || !isConnected.get() || mqttClient == null) return;
257
+        try {
258
+            List<String> topicList = new ArrayList<>(topicSet);
259
+            String[] topics = topicList.toArray(new String[0]);
260
+            int[] qosArr = new int[topics.length];
261
+            Arrays.fill(qosArr, QOS);
262
+            mqttClient.subscribe(topics, qosArr);
263
+            log.info("订阅完成:{} 个 Topic", topicList.size());
264
+        } catch (MqttException e) {
265
+            log.error("批量订阅失败:{}", e.getMessage());
266
+            retrySubscribeBatch(new ArrayList<>(topicSet), 1);
267
+        }
268
+    }
269
+
270
+    private void retrySubscribeBatch(List<String> batchTopics, int retryTimes) {
271
+        for (int retry = 1; retry <= retryTimes; retry++) {
272
+            try {
273
+                Thread.sleep(1000 * retry);
274
+                if (!isConnected.get() || mqttClient == null) continue;
275
+                String[] topics = batchTopics.toArray(new String[0]);
276
+                int[] qosArr = new int[topics.length];
277
+                Arrays.fill(qosArr, QOS);
278
+                mqttClient.subscribe(topics, qosArr);
279
+                log.info("重试订阅成功(第{}次),数量:{}", retry, batchTopics.size());
280
+                return;
281
+            } catch (MqttException e) {
282
+                log.error("重试订阅失败(第{}次):{}", retry, e.getMessage());
283
+            } catch (InterruptedException e) {
284
+                Thread.currentThread().interrupt();
285
+                log.error("重试订阅被中断");
286
+                break;
287
+            }
288
+        }
289
+        log.error("批次订阅最终失败:{}", batchTopics);
290
+    }
291
+
292
+    private void subscribeTopics(Set<String> topicsToAdd) {
293
+        if (topicsToAdd.isEmpty() || !isConnected.get()) return;
294
+        batchSubscribeTopics(topicsToAdd);
295
+    }
296
+
297
+    private void unsubscribeTopics(Set<String> topicsToRemove) {
298
+        if (topicsToRemove.isEmpty() || !isConnected.get() || mqttClient == null) return;
299
+        try {
300
+            String[] topics = topicsToRemove.toArray(new String[0]);
301
+            mqttClient.unsubscribe(topics);
302
+            log.info("取消订阅完成,数量:{}", topicsToRemove.size());
303
+        } catch (MqttException e) {
304
+            log.error("取消订阅失败:{}", e.getMessage());
305
+        }
306
+    }
307
+
308
+    private void unsubscribeAll() {
309
+        if (currentTopicSet.isEmpty() || !isConnected.get() || mqttClient == null) return;
310
+        try {
311
+            String[] topics = currentTopicSet.toArray(new String[0]);
312
+            mqttClient.unsubscribe(topics);
313
+            log.info("取消所有订阅,数量:{}", currentTopicSet.size());
314
+        } catch (MqttException e) {
315
+            log.error("取消所有订阅失败:{}", e.getMessage());
316
+        }
317
+    }
318
+
319
+    private void triggerReconnect() {
320
+        coreExecutor.schedule(() -> {
321
+            int maxAttempts = 3;
322
+            int attempt = 1;
323
+            while (attempt <= maxAttempts && !isConnected.get()) {
324
+                try {
325
+                    log.info("MQTT 重连(第{}次)", attempt);
326
+                    if (connectAndSubscribe(currentTopicSet)) {
327
+                        log.info("MQTT 重连成功");
328
+                        break;
329
+                    }
330
+                    attempt++;
331
+                    if (attempt <= maxAttempts) Thread.sleep(RECONNECT_INTERVAL * 2 * attempt);
332
+                } catch (InterruptedException e) {
333
+                    Thread.currentThread().interrupt();
334
+                    break;
335
+                } catch (Exception e) {
336
+                    log.error("MQTT 重连失败(第{}次):{}", attempt, e.getMessage());
337
+                    attempt++;
338
+                }
339
+            }
340
+            if (attempt > maxAttempts) {
341
+                log.error("MQTT 重连达最大次数,停止尝试");
342
+                isConnected.set(false);
343
+            }
344
+        }, 5, TimeUnit.SECONDS);
345
+    }
346
+
347
+    private boolean checkServerAvailability() {
348
+        try (Socket socket = new Socket()) {
349
+            socket.setSoTimeout(CONNECT_TIMEOUT);
350
+            socket.setTcpNoDelay(true);
351
+            socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
352
+            return true;
353
+        } catch (Exception e) {
354
+            log.error("MQTT Broker 不可达:{}", e.getMessage());
355
+            return false;
356
+        }
357
+    }
358
+
359
+    private String generateUniqueClientId() {
360
+        String osPrefix = System.getProperty("os.name").toLowerCase().contains("windows") ? "mqtt_win_" : "mqtt_linux_";
361
+        return osPrefix + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8);
362
+    }
363
+
364
+    private void setMqttCallback() {
365
+        if (mqttClient == null) return;
366
+        mqttClient.setCallback(new MqttCallback() {
367
+            @Override
368
+            public void connectionLost(Throwable cause) {
369
+                log.error("MQTT 连接断开:{}", cause.getMessage());
370
+                isConnected.set(false);
371
+                coreExecutor.schedule(AbstractDynamicMqttConsumer.this::triggerReconnect, 5, TimeUnit.SECONDS);
372
+            }
373
+
374
+            @Override
375
+            public void messageArrived(String topic, MqttMessage message) {
376
+                writeExecutor.submit(() -> {
377
+                    try {
378
+                        String content = new String(message.getPayload(), java.nio.charset.StandardCharsets.UTF_8);
379
+                        if (content == null || content.trim().isEmpty()) return;
380
+                        processMessage(content, topic);
381
+                    } catch (Exception e) {
382
+                        log.error("MQTT 消息处理失败 (Topic: {}): {}", topic, e.getMessage(), e);
383
+                    }
384
+                });
385
+            }
386
+
387
+            @Override
388
+            public void deliveryComplete(IMqttDeliveryToken token) {
389
+                try {
390
+                    token.waitForCompletion(1000);
391
+                } catch (MqttException ignored) {
392
+                }
393
+            }
394
+        });
395
+    }
396
+
397
+    protected Map<String, Object> deepCopyMap(Map<String, Object> original) {
398
+        if (original == null) return new HashMap<>();
399
+        try {
400
+            ObjectMapper mapper = new ObjectMapper();
401
+            String json = mapper.writeValueAsString(original);
402
+            return mapper.readValue(json, new TypeReference<Map<String, Object>>() {
403
+            });
404
+        } catch (Exception e) {
405
+            Map<String, Object> copy = new HashMap<>(original.size());
406
+            original.forEach((k, v) -> {
407
+                if (v instanceof Map) {
408
+                    copy.put(k, deepCopyMap((Map<String, Object>) v));
409
+                } else {
410
+                    copy.put(k, v);
411
+                }
412
+            });
413
+            return copy;
414
+        }
415
+    }
416
+
417
+    public void disconnect() {
418
+        synchronized (lock) {
419
+            if (mqttClient != null) {
420
+                try {
421
+                    if (mqttClient.isConnected()) {
422
+                        unsubscribeAll();
423
+                        mqttClient.disconnect(10000);
424
+                    }
425
+                    mqttClient.close();
426
+                    log.info("MQTT 连接已断开");
427
+                } catch (MqttException e) {
428
+                    log.error("断开 MQTT 连接失败:{}", e.getMessage());
429
+                } finally {
430
+                    isConnected.set(false);
431
+                    mqttClient = null;
432
+                }
433
+            }
434
+        }
435
+    }
436
+
437
+    @PreDestroy
438
+    public void destroy() {
439
+        log.info("服务正在关闭...");
440
+        disconnect();
441
+    }
442
+}

+ 30
- 432
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Прегледај датотеку

@@ -1,475 +1,73 @@
1 1
 package com.iot.platform.mqtt;
2
+
2 3
 import com.fasterxml.jackson.core.type.TypeReference;
3 4
 import com.fasterxml.jackson.databind.ObjectMapper;
4
-import com.iot.platform.domain.SysController;
5
-import com.iot.platform.service.*;
6
-import org.eclipse.paho.client.mqttv3.*;
7
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
8 5
 import com.iot.platform.config.IotProperties;
6
+import com.iot.platform.service.TDengineService;
9 7
 import org.springframework.beans.factory.annotation.Autowired;
10 8
 import org.springframework.beans.factory.annotation.Qualifier;
11 9
 import org.springframework.context.annotation.DependsOn;
12
-import org.springframework.data.redis.core.StringRedisTemplate;
13 10
 import org.springframework.stereotype.Component;
14
-import javax.annotation.PostConstruct;
15
-import javax.annotation.PreDestroy;
16
-import java.io.IOException;
17
-import java.net.InetSocketAddress;
18
-import java.net.Socket;
19
-import java.nio.charset.StandardCharsets;
11
+
20 12
 import java.time.LocalDate;
21
-import java.time.LocalDateTime;
22
-import java.time.format.DateTimeFormatter;
23 13
 import java.util.*;
24
-import java.util.concurrent.*;
25
-import java.util.concurrent.atomic.AtomicBoolean;
26
-import org.slf4j.Logger;
27
-import org.slf4j.LoggerFactory;
14
+import java.util.concurrent.ExecutorService;
15
+import java.util.concurrent.ScheduledExecutorService;
28 16
 
29
-/**
30
- * TDegine
31
- * 添加接驳庄数据到TDeinge
32
- */
17
+@SuppressWarnings("unchecked")
33 18
 @Component
34
-public class MqttChargeStationConsumer {
35
-
36
-    private static final Logger log = LoggerFactory.getLogger(MqttChargeStationConsumer.class);
19
+@DependsOn({"tdengineService"})
20
+public class MqttChargeStationConsumer extends AbstractDynamicMqttConsumer {
37 21
 
38
-    @Autowired
39
-    private TDengineService tdengineService;
22
+    private final TDengineService tdengineService;
40 23
     private final ObjectMapper objectMapper = new ObjectMapper();
41
-    @Autowired
42
-    private IotProperties iotProperties;
43
-
44
-    // MQTT 配置
45
-    private String brokerUrl;
46
-    private String brokerHost;
47
-    private int brokerPort;
48
-    private static final int QOS = 1;
49
-    private static final int CONNECT_TIMEOUT = 3000;
50
-    private static final int RECONNECT_INTERVAL = 5000;
51
-    private static final int MAX_BATCH_SIZE = 50;
52
-    private String mqttUsername;
53
-    private String mqttPassword;
54
-    private MqttClient mqttClient;
55
-    private MqttConnectOptions connOpts;
56
-    private final Set<String> currentTopicSet = new CopyOnWriteArraySet<>();
57
-    private final AtomicBoolean isConnected = new AtomicBoolean(false);
58
-    private final Object lock = new Object();
59
-
60
-    private final ScheduledExecutorService coreExecutor;
61
-    private final ExecutorService writeExecutor;
62 24
 
63 25
     @Autowired
64
-    public MqttChargeStationConsumer(@Qualifier("mqttCoreExecutor") ScheduledExecutorService coreExecutor,
65
-                                     @Qualifier("mqttWriteExecutor") ExecutorService writeExecutor) {
66
-        this.coreExecutor = coreExecutor;
67
-        this.writeExecutor = writeExecutor;
68
-    }
69
-
70
-    @PostConstruct
71
-    @DependsOn({"sysControllerService", "tdengineService"})
72
-    public void initMqttConnection() {
73
-        this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
74
-        String brokerAddr = this.brokerUrl.replace("tcp://", "");
75
-        int colonIdx = brokerAddr.lastIndexOf(':');
76
-        this.brokerHost = brokerAddr.substring(0, colonIdx);
77
-        this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
78
-        this.mqttUsername = iotProperties.getMqtt().getUsername();
79
-        this.mqttPassword = iotProperties.getMqtt().getPassword();
80
-
81
-        log.info(">>> 开始初始化 MQTT 服务...");
82
-
83
-        CompletableFuture<Boolean> connectFuture = CompletableFuture.supplyAsync(() -> {
84
-            int initRetry = 0;
85
-            while (initRetry < 3 && !isConnected.get()) {
86
-                try {
87
-                    if (refreshMqttSubscription()) {
88
-                        log.info(">>> MQTT 初始化成功");
89
-                        return true;
90
-                    }
91
-                } catch (Exception e) {
92
-                    log.error("MQTT 启动初始化失败(第" + (initRetry + 1) + "次):", e);
93
-                }
94
-                initRetry++;
95
-                try {
96
-                    Thread.sleep(RECONNECT_INTERVAL * initRetry);
97
-                } catch (InterruptedException ex) {
98
-                    Thread.currentThread().interrupt();
99
-                    break;
100
-                }
101
-            }
102
-            return false;
103
-        }, coreExecutor);
104
-
105
-        try {
106
-            Boolean connected = connectFuture.get(10, TimeUnit.SECONDS);
107
-            if (!connected) {
108
-                log.error("!!! MQTT 启动失败(超时),触发后台重连机制");
109
-                triggerReconnect();
110
-            }
111
-        } catch (TimeoutException e) {
112
-            log.error("MQTT 初始化超时(10秒),触发后台重连机制");
113
-            triggerReconnect();
114
-        } catch (InterruptedException e) {
115
-            Thread.currentThread().interrupt();
116
-            log.error("MQTT 初始化被中断");
117
-            triggerReconnect();
118
-        } catch (ExecutionException e) {
119
-            log.error("MQTT 初始化执行异常: ", e.getCause());
120
-            triggerReconnect();
121
-        }
122
-    }
123
-
124
-    private void initMqttConnectOptions() {
125
-        if (connOpts != null) return;
126
-        try {
127
-            connOpts = new MqttConnectOptions();
128
-            connOpts.setCleanSession(false);
129
-            connOpts.setAutomaticReconnect(true);
130
-            connOpts.setConnectionTimeout(10);
131
-            connOpts.setKeepAliveInterval(60);
132
-            connOpts.setMaxInflight(10);
133
-            connOpts.setMaxReconnectDelay(30);
134
-            connOpts.setUserName(mqttUsername);
135
-            connOpts.setPassword(mqttPassword.toCharArray());
136
-        } catch (Exception e) {
137
-            log.error("初始化 MQTT 配置失败:" + e.getMessage());
138
-            throw new RuntimeException(e);
139
-        }
140
-    }
141
-
142
-    public boolean refreshMqttSubscription() {
143
-        synchronized (lock) {
144
-            try {
145
-                initMqttConnectOptions();
146
-                List<String> latestTopicList =new ArrayList<>();
147
-                latestTopicList.add("station/ChargeStation/device/+/post/json");
148
-                log.info("🔍 查询到 Topic 列表: " + latestTopicList);
149
-
150
-                if (latestTopicList == null || latestTopicList.isEmpty()) {
151
-                    log.error("未查询到 Topic,取消所有订阅");
152
-                    unsubscribeAll();
153
-                    currentTopicSet.clear();
154
-                    return false;
155
-                }
156
-
157
-                Set<String> latestTopicSet = new HashSet<>();
158
-                for (String topic : latestTopicList) {
159
-                    if (topic != null && !topic.trim().isEmpty()) {
160
-                        latestTopicSet.add(topic.trim());
161
-                    }
162
-                }
163
-
164
-                if (latestTopicSet.equals(currentTopicSet)) {
165
-                    log.info("Topic 列表无变化,无需刷新订阅关系");
166
-                    if (!isConnected.get() && !latestTopicSet.isEmpty()) {
167
-                        return connectAndSubscribe(latestTopicSet);
168
-                    }
169
-                    return true;
170
-                }
171
-
172
-                Set<String> topicsToAdd = new HashSet<>(latestTopicSet);
173
-                topicsToAdd.removeAll(currentTopicSet);
174
-                Set<String> topicsToRemove = new HashSet<>(currentTopicSet);
175
-                topicsToRemove.removeAll(latestTopicSet);
176
-
177
-                if (!isConnected.get()) {
178
-                    return connectAndSubscribe(latestTopicSet);
179
-                } else {
180
-                    if (!topicsToRemove.isEmpty()) unsubscribeTopics(topicsToRemove);
181
-                    if (!topicsToAdd.isEmpty()) subscribeTopics(topicsToAdd);
182
-                }
183
-
184
-                currentTopicSet.clear();
185
-                currentTopicSet.addAll(latestTopicSet);
186
-                log.info("MQTT 订阅刷新完成,当前数量:" + currentTopicSet.size());
187
-                return true;
188
-
189
-            } catch (Exception e) {
190
-                log.error("MQTT 订阅刷新失败:", e);
191
-                return false;
192
-            }
193
-        }
194
-    }
195
-
196
-    private boolean connectAndSubscribe(Set<String> topicSet) {
197
-        synchronized (lock) {
198
-            if (isConnected.get() && mqttClient != null && mqttClient.isConnected()) {
199
-                return true;
200
-            }
201
-            try {
202
-                if (!checkServerAvailability()) {
203
-                    log.error("Broker 不可达,连接失败");
204
-                    return false;
205
-                }
206
-
207
-                if (mqttClient == null) {
208
-                    String clientId = generateUniqueClientId();
209
-                    MemoryPersistence persistence = new MemoryPersistence();
210
-                    mqttClient = new MqttClient(brokerUrl, clientId, persistence);
211
-                    setMqttCallback();
212
-                }
213
-
214
-                int connectRetry = 0;
215
-                while (connectRetry < 3 && !mqttClient.isConnected()) {
216
-                    try {
217
-                        mqttClient.connect(connOpts);
218
-                        if (mqttClient.isConnected()) {
219
-                            isConnected.set(true);
220
-                            log.info("MQTT 连接成功,客户端 ID:" + mqttClient.getClientId());
221
-                            break;
222
-                        }
223
-                    } catch (MqttException e) {
224
-                        log.error("MQTT 连接失败(第" + (connectRetry + 1) + "次):" + e.getMessage());
225
-                        connectRetry++;
226
-                        if (connectRetry < 3) Thread.sleep(RECONNECT_INTERVAL * (connectRetry + 1));
227
-                    }
228
-                }
229
-
230
-                if (!mqttClient.isConnected()) {
231
-                    isConnected.set(false);
232
-                    return false;
233
-                }
234
-
235
-                if (!topicSet.isEmpty()) {
236
-                    List<String> topicList = new ArrayList<>(topicSet);
237
-                    for (int i = 0; i < topicList.size(); i += MAX_BATCH_SIZE) {
238
-                        int end = Math.min(i + MAX_BATCH_SIZE, topicList.size());
239
-                        List<String> batch = topicList.subList(i, end);
240
-                        batchSubscribeTopics(new HashSet<>(batch));
241
-                        Thread.sleep(100);
242
-                    }
243
-                }
244
-                return true;
245
-
246
-            } catch (MqttException e) {
247
-                log.error("MQTT 连接 + 订阅失败:", e);
248
-                isConnected.set(false);
249
-                triggerReconnect();
250
-                return false;
251
-            } catch (InterruptedException e) {
252
-                Thread.currentThread().interrupt();
253
-                log.error("MQTT 连接 + 订阅被中断");
254
-                isConnected.set(false);
255
-                return false;
256
-            }
257
-        }
258
-    }
259
-
260
-    private void batchSubscribeTopics(Set<String> topicSet) {
261
-        if (topicSet.isEmpty() || !isConnected.get() || mqttClient == null) return;
262
-        try {
263
-            List<String> topicList = new ArrayList<>(topicSet);
264
-            String[] topics = topicList.toArray(new String[0]);
265
-            int[] qosArr = new int[topics.length];
266
-            Arrays.fill(qosArr, QOS);
267
-            mqttClient.subscribe(topics, qosArr);
268
-            log.info("订阅完成:" + topicList.size() + "个 Topic");
269
-        } catch (MqttException e) {
270
-            log.error("批量订阅失败:" + e.getMessage());
271
-            retrySubscribeBatch(new ArrayList<>(topicSet), 1);
272
-        }
273
-    }
274
-
275
-    private void retrySubscribeBatch(List<String> batchTopics, int retryTimes) {
276
-        for (int retry = 1; retry <= retryTimes; retry++) {
277
-            try {
278
-                Thread.sleep(1000 * retry);
279
-                if (!isConnected.get() || mqttClient == null) continue;
280
-                String[] topics = batchTopics.toArray(new String[0]);
281
-                int[] qosArr = new int[topics.length];
282
-                Arrays.fill(qosArr, QOS);
283
-                mqttClient.subscribe(topics, qosArr);
284
-                log.info("重试订阅成功(第" + retry + "次),数量:" + batchTopics.size());
285
-                return;
286
-            } catch (MqttException e) {
287
-                log.error("重试订阅失败(第" + retry + "次):" + e.getMessage());
288
-            } catch (InterruptedException e) {
289
-                Thread.currentThread().interrupt();
290
-                log.error("重试订阅被中断");
291
-                break;
292
-            }
293
-        }
294
-        log.error("批次订阅最终失败:" + batchTopics);
295
-    }
296
-
297
-    private void subscribeTopics(Set<String> topicsToAdd) {
298
-        if (topicsToAdd.isEmpty() || !isConnected.get()) return;
299
-        batchSubscribeTopics(topicsToAdd);
300
-    }
301
-
302
-    private void unsubscribeTopics(Set<String> topicsToRemove) {
303
-        if (topicsToRemove.isEmpty() || !isConnected.get() || mqttClient == null) return;
304
-        try {
305
-            String[] topics = topicsToRemove.toArray(new String[0]);
306
-            mqttClient.unsubscribe(topics);
307
-            log.info("取消订阅完成,数量:" + topicsToRemove.size());
308
-        } catch (MqttException e) {
309
-            log.error("取消订阅失败:" + e.getMessage());
26
+    public MqttChargeStationConsumer(IotProperties iotProperties,
27
+                                     @Qualifier("mqttCoreExecutor") ScheduledExecutorService coreExecutor,
28
+                                     @Qualifier("mqttWriteExecutor") ExecutorService writeExecutor,
29
+                                     TDengineService tdengineService) {
30
+        super(iotProperties, coreExecutor, writeExecutor);
31
+        this.tdengineService = tdengineService;
32
+    }
33
+
34
+    @Override
35
+    protected List<String> fetchTopics() {
36
+        String topic = iotProperties.getMqtt().getChargeStationTopic();
37
+        if (topic == null || topic.trim().isEmpty()) {
38
+            log.warn("ChargeStation topic 未配置");
39
+            return Collections.emptyList();
310 40
         }
41
+        return Collections.singletonList(topic.trim());
311 42
     }
312 43
 
313
-    private void unsubscribeAll() {
314
-        if (currentTopicSet.isEmpty() || !isConnected.get() || mqttClient == null) return;
315
-        try {
316
-            String[] topics = currentTopicSet.toArray(new String[0]);
317
-            mqttClient.unsubscribe(topics);
318
-            log.info("取消所有订阅,数量:" + currentTopicSet.size());
319
-        } catch (MqttException e) {
320
-            log.error("取消所有订阅失败:" + e.getMessage());
321
-        }
322
-    }
323
-
324
-    private void triggerReconnect() {
325
-        coreExecutor.schedule(() -> {
326
-            int maxAttempts = 3;
327
-            int attempt = 1;
328
-            while (attempt <= maxAttempts && !isConnected.get()) {
329
-                try {
330
-                    log.info("MQTT 重连(第" + attempt + "次)");
331
-                    if (connectAndSubscribe(currentTopicSet)) {
332
-                        log.info("MQTT 重连成功");
333
-                        break;
334
-                    }
335
-                    attempt++;
336
-                    if (attempt <= maxAttempts) Thread.sleep(RECONNECT_INTERVAL * 2 * attempt);
337
-                } catch (InterruptedException e) {
338
-                    Thread.currentThread().interrupt();
339
-                    break;
340
-                } catch (Exception e) {
341
-                    log.error("MQTT 重连失败(第" + attempt + "次):" + e.getMessage());
342
-                    attempt++;
343
-                }
344
-            }
345
-            if (attempt > maxAttempts) {
346
-                log.error("MQTT 重连达最大次数,停止尝试");
347
-                isConnected.set(false);
348
-            }
349
-        }, 5, TimeUnit.SECONDS);
350
-    }
351
-
352
-    private boolean checkServerAvailability() {
353
-        try (Socket socket = new Socket()) {
354
-            socket.setSoTimeout(CONNECT_TIMEOUT);
355
-            socket.setTcpNoDelay(true);
356
-            socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
357
-            return true;
358
-        } catch (Exception e) {
359
-            log.error("MQTT Broker 不可达:" + e.getMessage());
360
-            return false;
361
-        }
44
+    @Override
45
+    protected void processMessage(String content, String topic) throws Exception {
46
+        List<Map<String, Object>> messageList = objectMapper.readValue(content, new TypeReference<List<Map<String, Object>>>() {});
47
+        processMessageAndWriteToTDengine(messageList, topic);
362 48
     }
363 49
 
364
-    private String generateUniqueClientId() {
365
-        String osPrefix = System.getProperty("os.name").toLowerCase().contains("windows") ? "mqtt_win_" : "mqtt_linux_";
366
-        return osPrefix + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8);
367
-    }
368
-
369
-    private void setMqttCallback() {
370
-        if (mqttClient == null) return;
371
-        mqttClient.setCallback(new MqttCallback() {
372
-            @Override
373
-            public void connectionLost(Throwable cause) {
374
-                log.error("MQTT 连接断开:" + cause.getMessage());
375
-                isConnected.set(false);
376
-                coreExecutor.schedule(() -> triggerReconnect(), 5, TimeUnit.SECONDS);
377
-            }
378
-
379
-            @Override
380
-            public void messageArrived(String topic, MqttMessage message) {
381
-                // 直接提交到写入线程池处理
382
-                writeExecutor.submit(() -> {
383
-                    try {
384
-                        String content = new String(message.getPayload(), StandardCharsets.UTF_8);
385
-                        if (content == null || content.trim().isEmpty()) return;
386
-                        List<Map<String, Object>> messageList = objectMapper.readValue(content, new TypeReference<List<Map<String, Object>>>() {});
387
-
388
-                        processMessageAndWriteToTDengine(messageList, topic);
389
-                    } catch (Exception e) {
390
-                        log.error("MQTT 消息处理失败(Topic:" + topic + "):", e);
391
-                    }
392
-                });
393
-            }
394
-
395
-            @Override
396
-            public void deliveryComplete(IMqttDeliveryToken token) {
397
-                try { token.waitForCompletion(1000); } catch (MqttException ignored) {}
398
-            }
399
-        });
400
-    }
401
-
402
-    // 参数改为 List
403 50
     private void processMessageAndWriteToTDengine(List<Map<String, Object>> dataList, String topic) throws Exception {
404 51
         String[] topicParts = topic.split("/");
405 52
         if (topicParts.length < 2) {
406 53
             throw new IllegalArgumentException("无效的 Topic 格式:" + topic);
407 54
         }
408
-        // TDengine 批量插入通常需要一个列表
55
+
409 56
         List<Map<String, Object>> batchToInsert = new ArrayList<>();
410 57
         for (Map<String, Object> dataMap : dataList) {
411
-            // 检查数据是否为空
412 58
             if (dataMap == null || dataMap.isEmpty()) {
413 59
                 continue;
414 60
             }
415
-            // 深拷贝
416 61
             Map<String, Object> list = deepCopyMap(dataMap);
417 62
             batchToInsert.add(list);
418 63
         }
419 64
 
420
-        // 如果有数据,才写入数据库
421 65
         if (!batchToInsert.isEmpty()) {
422 66
             String dbName = topicParts[1];
423 67
             String superTable = topicParts[3];
424 68
             LocalDate date = LocalDate.now();
425 69
             String tableName = superTable + "_" + date.getYear() + date.getMonthValue();
426
-            // 调用写入方法(假设 insertBatch 接收 List)
427 70
             tdengineService.insertBatch(dbName, tableName, batchToInsert);
428 71
         }
429 72
     }
430
-
431
-    private Map<String, Object> deepCopyMap(Map<String, Object> original) {
432
-        if (original == null) return new HashMap<>();
433
-        try {
434
-            String json = objectMapper.writeValueAsString(original);
435
-            return objectMapper.readValue(json, new TypeReference<Map<String, Object>>() {});
436
-        } catch (Exception e) {
437
-            Map<String, Object> copy = new HashMap<>(original.size());
438
-            original.forEach((k, v) -> {
439
-                if (v instanceof Map) {
440
-                    copy.put(k, deepCopyMap((Map<String, Object>) v));
441
-                } else {
442
-                    copy.put(k, v);
443
-                }
444
-            });
445
-            return copy;
446
-        }
447
-    }
448
-
449
-    public void disconnect() {
450
-        synchronized (lock) {
451
-            if (mqttClient != null) {
452
-                try {
453
-                    if (mqttClient.isConnected()) {
454
-                        unsubscribeAll();
455
-                        mqttClient.disconnect(10000);
456
-                    }
457
-                    mqttClient.close();
458
-                    log.info("MQTT 连接已断开");
459
-                } catch (MqttException e) {
460
-                    log.error("断开 MQTT 连接失败:" + e.getMessage());
461
-                } finally {
462
-                    isConnected.set(false);
463
-                    mqttClient = null;
464
-                }
465
-            }
466
-            if (tdengineService != null) tdengineService.close();
467
-        }
468
-    }
469
-
470
-    @PreDestroy
471
-    public void destroy() {
472
-        log.info(">>> 服务正在关闭...");
473
-        disconnect();
474
-    }
475
-}
73
+}

+ 30
- 437
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Прегледај датотеку

@@ -2,411 +2,58 @@ package com.iot.platform.mqtt;
2 2
 
3 3
 import com.fasterxml.jackson.core.type.TypeReference;
4 4
 import com.fasterxml.jackson.databind.ObjectMapper;
5
+import com.iot.platform.config.IotProperties;
5 6
 import com.iot.platform.domain.SysController;
6 7
 import com.iot.platform.service.SysControllerService;
7 8
 import com.iot.platform.service.TDengineService;
8 9
 import org.eclipse.paho.client.mqttv3.*;
9
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
10
-import com.iot.platform.config.IotProperties;
11 10
 import org.springframework.beans.factory.annotation.Autowired;
12 11
 import org.springframework.beans.factory.annotation.Qualifier;
13 12
 import org.springframework.context.annotation.DependsOn;
14 13
 import org.springframework.data.redis.core.StringRedisTemplate;
15 14
 import org.springframework.stereotype.Component;
16 15
 
17
-import javax.annotation.PostConstruct;
18
-import javax.annotation.PreDestroy;
19
-import java.net.InetSocketAddress;
20
-import java.net.Socket;
21
-import java.nio.charset.StandardCharsets;
22 16
 import java.time.LocalDate;
23 17
 import java.time.LocalDateTime;
24 18
 import java.time.format.DateTimeFormatter;
25 19
 import java.util.*;
26
-import java.util.concurrent.*;
27
-import java.util.concurrent.atomic.AtomicBoolean;
28
-import org.slf4j.Logger;
29
-import org.slf4j.LoggerFactory;
20
+import java.util.concurrent.ExecutorService;
21
+import java.util.concurrent.ScheduledExecutorService;
22
+import java.util.concurrent.TimeUnit;
30 23
 
31
-@Component
32 24
 @SuppressWarnings("unchecked")
33
-public class MqttDynamicConsumer {
34
-
35
-    private static final Logger log = LoggerFactory.getLogger(MqttDynamicConsumer.class);
36
-
37
-    @Autowired
38
-    private SysControllerService sysControllerService;
39
-
40
-    @Autowired
41
-    private TDengineService tdengineService;
25
+@Component
26
+@DependsOn({"sysControllerService", "tdengineService"})
27
+public class MqttDynamicConsumer extends AbstractDynamicMqttConsumer {
42 28
 
29
+    private final SysControllerService sysControllerService;
30
+    private final TDengineService tdengineService;
31
+    private final StringRedisTemplate stringRedisTemplate;
43 32
     private final ObjectMapper objectMapper = new ObjectMapper();
44
-    @Autowired
45
-    private StringRedisTemplate stringRedisTemplate;
46 33
 
47 34
     @Autowired
48
-    private IotProperties iotProperties;
49
-
50
-    // MQTT 配置
51
-    private String brokerUrl;
52
-    private String brokerHost;
53
-    private int brokerPort;
54
-    private static final int QOS = 1;
55
-    private static final int CONNECT_TIMEOUT = 3000;
56
-    private static final int RECONNECT_INTERVAL = 5000;
57
-    private static final int MAX_BATCH_SIZE = 50;
58
-
59
-    private String mqttUsername;
60
-    private String mqttPassword;
61
-
62
-    private MqttClient mqttClient;
63
-    private MqttConnectOptions connOpts;
64
-    private final Set<String> currentTopicSet = new CopyOnWriteArraySet<>();
65
-    private final AtomicBoolean isConnected = new AtomicBoolean(false);
66
-    private final Object lock = new Object();
67
-
68
-    private final ScheduledExecutorService coreExecutor;
69
-    private final ExecutorService writeExecutor;
70
-
71
-    @Autowired
72
-    public MqttDynamicConsumer(@Qualifier("mqttCoreExecutor") ScheduledExecutorService coreExecutor,
73
-                               @Qualifier("mqttWriteExecutor") ExecutorService writeExecutor) {
74
-        this.coreExecutor = coreExecutor;
75
-        this.writeExecutor = writeExecutor;
76
-    }
77
-
78
-    @PostConstruct
79
-    @DependsOn({"sysControllerService", "tdengineService"})
80
-    public void initMqttConnection() {
81
-        this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
82
-        String brokerAddr = this.brokerUrl.replace("tcp://", "");
83
-        int colonIdx = brokerAddr.lastIndexOf(':');
84
-        this.brokerHost = brokerAddr.substring(0, colonIdx);
85
-        this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
86
-        this.mqttUsername = iotProperties.getMqtt().getUsername();
87
-        this.mqttPassword = iotProperties.getMqtt().getPassword();
88
-
89
-        log.info(">>> 开始初始化 MQTT 服务...");
90
-
91
-        CompletableFuture<Boolean> connectFuture = CompletableFuture.supplyAsync(() -> {
92
-            int initRetry = 0;
93
-            while (initRetry < 3 && !isConnected.get()) {
94
-                try {
95
-                    if (refreshMqttSubscription()) {
96
-                        log.info(">>> MQTT 初始化成功");
97
-                        return true;
98
-                    }
99
-                } catch (Exception e) {
100
-                    log.error("MQTT 启动初始化失败(第" + (initRetry + 1) + "次):", e);
101
-                }
102
-                initRetry++;
103
-                try {
104
-                    Thread.sleep(RECONNECT_INTERVAL * initRetry);
105
-                } catch (InterruptedException ex) {
106
-                    Thread.currentThread().interrupt();
107
-                    break;
108
-                }
109
-            }
110
-            return false;
111
-        }, coreExecutor);
112
-
113
-        try {
114
-            Boolean connected = connectFuture.get(10, TimeUnit.SECONDS);
115
-            if (!connected) {
116
-                log.error("!!! MQTT 启动失败(超时),触发后台重连机制");
117
-                triggerReconnect();
118
-            }
119
-        } catch (TimeoutException e) {
120
-            log.error("MQTT 初始化超时(10秒),触发后台重连机制");
121
-            triggerReconnect();
122
-        } catch (InterruptedException e) {
123
-            Thread.currentThread().interrupt();
124
-            log.error("MQTT 初始化被中断");
125
-            triggerReconnect();
126
-        } catch (ExecutionException e) {
127
-            log.error("MQTT 初始化执行异常: ", e.getCause());
128
-            triggerReconnect();
129
-        }
35
+    public MqttDynamicConsumer(IotProperties iotProperties,
36
+                               @Qualifier("mqttCoreExecutor") ScheduledExecutorService coreExecutor,
37
+                               @Qualifier("mqttWriteExecutor") ExecutorService writeExecutor,
38
+                               SysControllerService sysControllerService,
39
+                               TDengineService tdengineService,
40
+                               StringRedisTemplate stringRedisTemplate) {
41
+        super(iotProperties, coreExecutor, writeExecutor);
42
+        this.sysControllerService = sysControllerService;
43
+        this.tdengineService = tdengineService;
44
+        this.stringRedisTemplate = stringRedisTemplate;
130 45
     }
131 46
 
132
-    private void initMqttConnectOptions() {
133
-        if (connOpts != null) return;
134
-        try {
135
-            connOpts = new MqttConnectOptions();
136
-            connOpts.setCleanSession(false);
137
-            connOpts.setAutomaticReconnect(true);
138
-            connOpts.setConnectionTimeout(10);
139
-            connOpts.setKeepAliveInterval(60);
140
-            connOpts.setMaxInflight(10);
141
-            connOpts.setMaxReconnectDelay(30);
142
-            connOpts.setUserName(mqttUsername);
143
-            connOpts.setPassword(mqttPassword.toCharArray());
144
-        } catch (Exception e) {
145
-            log.error("初始化 MQTT 配置失败:" + e.getMessage());
146
-            throw new RuntimeException(e);
147
-        }
47
+    @Override
48
+    protected List<String> fetchTopics() {
49
+        return sysControllerService.selectall();
148 50
     }
149 51
 
150
-    public boolean refreshMqttSubscription() {
151
-        synchronized (lock) {
152
-            try {
153
-                initMqttConnectOptions();
154
-                List<String> latestTopicList = sysControllerService.selectall();
155
-                log.info("🔍 查询到 Topic 列表: " + latestTopicList);
156
-
157
-                if (latestTopicList == null || latestTopicList.isEmpty()) {
158
-                    log.error("未查询到 Topic,取消所有订阅");
159
-                    unsubscribeAll();
160
-                    currentTopicSet.clear();
161
-                    return false;
162
-                }
163
-
164
-                Set<String> latestTopicSet = new HashSet<>();
165
-                for (String topic : latestTopicList) {
166
-                    if (topic != null && !topic.trim().isEmpty()) {
167
-                        latestTopicSet.add(topic.trim());
168
-                    }
169
-                }
170
-
171
-                if (latestTopicSet.equals(currentTopicSet)) {
172
-                    log.info("Topic 列表无变化,无需刷新订阅关系");
173
-                    if (!isConnected.get() && !latestTopicSet.isEmpty()) {
174
-                        return connectAndSubscribe(latestTopicSet);
175
-                    }
176
-                    return true;
177
-                }
178
-
179
-                Set<String> topicsToAdd = new HashSet<>(latestTopicSet);
180
-                topicsToAdd.removeAll(currentTopicSet);
181
-                Set<String> topicsToRemove = new HashSet<>(currentTopicSet);
182
-                topicsToRemove.removeAll(latestTopicSet);
183
-
184
-                if (!isConnected.get()) {
185
-                    return connectAndSubscribe(latestTopicSet);
186
-                } else {
187
-                    if (!topicsToRemove.isEmpty()) unsubscribeTopics(topicsToRemove);
188
-                    if (!topicsToAdd.isEmpty()) subscribeTopics(topicsToAdd);
189
-                }
190
-
191
-                currentTopicSet.clear();
192
-                currentTopicSet.addAll(latestTopicSet);
193
-                log.info("MQTT 订阅刷新完成,当前数量:" + currentTopicSet.size());
194
-                return true;
195
-
196
-            } catch (Exception e) {
197
-                log.error("MQTT 订阅刷新失败:", e);
198
-                return false;
199
-            }
200
-        }
201
-    }
202
-
203
-    private boolean connectAndSubscribe(Set<String> topicSet) {
204
-        synchronized (lock) {
205
-            if (isConnected.get() && mqttClient != null && mqttClient.isConnected()) {
206
-                return true;
207
-            }
208
-            try {
209
-                if (!checkServerAvailability()) {
210
-                    log.error("Broker 不可达,连接失败");
211
-                    return false;
212
-                }
213
-
214
-                if (mqttClient == null) {
215
-                    String clientId = generateUniqueClientId();
216
-                    MemoryPersistence persistence = new MemoryPersistence();
217
-                    mqttClient = new MqttClient(brokerUrl, clientId, persistence);
218
-                    setMqttCallback();
219
-                }
220
-
221
-                int connectRetry = 0;
222
-                while (connectRetry < 3 && !mqttClient.isConnected()) {
223
-                    try {
224
-                        mqttClient.connect(connOpts);
225
-                        if (mqttClient.isConnected()) {
226
-                            isConnected.set(true);
227
-                            log.info("MQTT 连接成功,客户端 ID:" + mqttClient.getClientId());
228
-                            break;
229
-                        }
230
-                    } catch (MqttException e) {
231
-                        log.error("MQTT 连接失败(第" + (connectRetry + 1) + "次):" + e.getMessage());
232
-                        connectRetry++;
233
-                        if (connectRetry < 3) Thread.sleep(RECONNECT_INTERVAL * (connectRetry + 1));
234
-                    }
235
-                }
236
-
237
-                if (!mqttClient.isConnected()) {
238
-                    isConnected.set(false);
239
-                    return false;
240
-                }
241
-
242
-                if (!topicSet.isEmpty()) {
243
-                    List<String> topicList = new ArrayList<>(topicSet);
244
-                    for (int i = 0; i < topicList.size(); i += MAX_BATCH_SIZE) {
245
-                        int end = Math.min(i + MAX_BATCH_SIZE, topicList.size());
246
-                        List<String> batch = topicList.subList(i, end);
247
-                        batchSubscribeTopics(new HashSet<>(batch));
248
-                        Thread.sleep(100);
249
-                    }
250
-                }
251
-                return true;
252
-
253
-            } catch (MqttException e) {
254
-                log.error("MQTT 连接 + 订阅失败:", e);
255
-                isConnected.set(false);
256
-                triggerReconnect();
257
-                return false;
258
-            } catch (InterruptedException e) {
259
-                Thread.currentThread().interrupt();
260
-                log.error("MQTT 连接 + 订阅被中断");
261
-                isConnected.set(false);
262
-                return false;
263
-            }
264
-        }
265
-    }
266
-
267
-
268
-
269
-
270
-    private void batchSubscribeTopics(Set<String> topicSet) {
271
-        if (topicSet.isEmpty() || !isConnected.get() || mqttClient == null) return;
272
-        try {
273
-            List<String> topicList = new ArrayList<>(topicSet);
274
-            String[] topics = topicList.toArray(new String[0]);
275
-            int[] qosArr = new int[topics.length];
276
-            Arrays.fill(qosArr, QOS);
277
-            mqttClient.subscribe(topics, qosArr);
278
-            log.info("订阅完成:" + topicList.size() + "个 Topic");
279
-        } catch (MqttException e) {
280
-            log.error("批量订阅失败:" + e.getMessage());
281
-            retrySubscribeBatch(new ArrayList<>(topicSet), 1);
282
-        }
283
-    }
284
-
285
-    private void retrySubscribeBatch(List<String> batchTopics, int retryTimes) {
286
-        for (int retry = 1; retry <= retryTimes; retry++) {
287
-            try {
288
-                Thread.sleep(1000 * retry);
289
-                if (!isConnected.get() || mqttClient == null) continue;
290
-                String[] topics = batchTopics.toArray(new String[0]);
291
-                int[] qosArr = new int[topics.length];
292
-                Arrays.fill(qosArr, QOS);
293
-                mqttClient.subscribe(topics, qosArr);
294
-                log.info("重试订阅成功(第" + retry + "次),数量:" + batchTopics.size());
295
-                return;
296
-            } catch (MqttException e) {
297
-                log.error("重试订阅失败(第" + retry + "次):" + e.getMessage());
298
-            } catch (InterruptedException e) {
299
-                Thread.currentThread().interrupt();
300
-                log.error("重试订阅被中断");
301
-                break;
302
-            }
303
-        }
304
-        log.error("批次订阅最终失败:" + batchTopics);
305
-    }
306
-
307
-    private void subscribeTopics(Set<String> topicsToAdd) {
308
-        if (topicsToAdd.isEmpty() || !isConnected.get()) return;
309
-        batchSubscribeTopics(topicsToAdd);
310
-    }
311
-
312
-    private void unsubscribeTopics(Set<String> topicsToRemove) {
313
-        if (topicsToRemove.isEmpty() || !isConnected.get() || mqttClient == null) return;
314
-        try {
315
-            String[] topics = topicsToRemove.toArray(new String[0]);
316
-            mqttClient.unsubscribe(topics);
317
-            log.info("取消订阅完成,数量:" + topicsToRemove.size());
318
-        } catch (MqttException e) {
319
-            log.error("取消订阅失败:" + e.getMessage());
320
-        }
321
-    }
322
-
323
-    private void unsubscribeAll() {
324
-        if (currentTopicSet.isEmpty() || !isConnected.get() || mqttClient == null) return;
325
-        try {
326
-            String[] topics = currentTopicSet.toArray(new String[0]);
327
-            mqttClient.unsubscribe(topics);
328
-            log.info("取消所有订阅,数量:" + currentTopicSet.size());
329
-        } catch (MqttException e) {
330
-            log.error("取消所有订阅失败:" + e.getMessage());
331
-        }
332
-    }
333
-
334
-    private void triggerReconnect() {
335
-        coreExecutor.schedule(() -> {
336
-            int maxAttempts = 3;
337
-            int attempt = 1;
338
-            while (attempt <= maxAttempts && !isConnected.get()) {
339
-                try {
340
-                    log.info("MQTT 重连(第" + attempt + "次)");
341
-                    if (connectAndSubscribe(currentTopicSet)) {
342
-                        log.info("MQTT 重连成功");
343
-                        break;
344
-                    }
345
-                    attempt++;
346
-                    if (attempt <= maxAttempts) Thread.sleep(RECONNECT_INTERVAL * 2 * attempt);
347
-                } catch (InterruptedException e) {
348
-                    Thread.currentThread().interrupt();
349
-                    break;
350
-                } catch (Exception e) {
351
-                    log.error("MQTT 重连失败(第" + attempt + "次):" + e.getMessage());
352
-                    attempt++;
353
-                }
354
-            }
355
-            if (attempt > maxAttempts) {
356
-                log.error("MQTT 重连达最大次数,停止尝试");
357
-                isConnected.set(false);
358
-            }
359
-        }, 5, TimeUnit.SECONDS);
360
-    }
361
-
362
-    private boolean checkServerAvailability() {
363
-        try (Socket socket = new Socket()) {
364
-            socket.setSoTimeout(CONNECT_TIMEOUT);
365
-            socket.setTcpNoDelay(true);
366
-            socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
367
-            return true;
368
-        } catch (Exception e) {
369
-            log.error("MQTT Broker 不可达:" + e.getMessage());
370
-            return false;
371
-        }
372
-    }
373
-
374
-    private String generateUniqueClientId() {
375
-        String osPrefix = System.getProperty("os.name").toLowerCase().contains("windows") ? "mqtt_win_" : "mqtt_linux_";
376
-        return osPrefix + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8);
377
-    }
378
-
379
-    private void setMqttCallback() {
380
-        if (mqttClient == null) return;
381
-        mqttClient.setCallback(new MqttCallback() {
382
-            @Override
383
-            public void connectionLost(Throwable cause) {
384
-                log.error("MQTT 连接断开:" + cause.getMessage());
385
-                isConnected.set(false);
386
-                coreExecutor.schedule(() -> triggerReconnect(), 5, TimeUnit.SECONDS);
387
-            }
388
-
389
-            @Override
390
-            public void messageArrived(String topic, MqttMessage message) {
391
-                // 直接提交到写入线程池处理
392
-                writeExecutor.submit(() -> {
393
-                    try {
394
-                        String content = new String(message.getPayload(), StandardCharsets.UTF_8);
395
-                        if (content == null || content.trim().isEmpty()) return;
396
-
397
-                        Map<String, Object> messageMap = objectMapper.readValue(content, new TypeReference<Map<String, Object>>() {});
398
-                        processMessageAndWriteToTDengine(messageMap, topic);
399
-                    } catch (Exception e) {
400
-                        log.error("MQTT 消息处理失败(Topic:" + topic + "):", e);
401
-                    }
402
-                });
403
-            }
404
-
405
-            @Override
406
-            public void deliveryComplete(IMqttDeliveryToken token) {
407
-                try { token.waitForCompletion(1000); } catch (MqttException ignored) {}
408
-            }
409
-        });
52
+    @Override
53
+    protected void processMessage(String content, String topic) throws Exception {
54
+        Map<String, Object> weather = objectMapper.readValue(content, new TypeReference<Map<String, Object>>() {});
55
+        processMessageAndWriteToTDengine(weather, topic);
56
+        insertredis(weather, topic);
410 57
     }
411 58
 
412 59
     private void processMessageAndWriteToTDengine(Map<String, Object> weather, String topic) throws Exception {
@@ -433,7 +80,6 @@ public class MqttDynamicConsumer {
433 80
         list.put("timestamp", weather.get("timestamp"));
434 81
         list.put("device_id", weather.get("device_id"));
435 82
 
436
-        // 构建单条数据并立即写入(可改为批量缓冲)
437 83
         String dbName = topicParts[0];
438 84
         String superTable = topicParts[1];
439 85
         LocalDate date = LocalDate.now();
@@ -441,16 +87,13 @@ public class MqttDynamicConsumer {
441 87
 
442 88
         List<Map<String, Object>> batch = Collections.singletonList(list);
443 89
         tdengineService.insertBatch(dbName, tableName, batch);
444
-        insertredis(weather,topic);
445 90
     }
446 91
 
447
-
448 92
     public void insertredis(Map<String, Object> weather, String topic) throws Exception {
449 93
         String[] topicParts = topic.split("/", 2);
450 94
         if (topicParts.length < 2) return;
451 95
 
452 96
         String controllerId = topicParts[0];
453
-        String deviceIdFromTopic = topicParts[1]; // 虽然不用在 key 中,但可记录
454 97
 
455 98
         SysController sysController = sysControllerService.selectcontrollerpath(topic);
456 99
         if (sysController == null || sysController.getName() == null) return;
@@ -461,14 +104,12 @@ public class MqttDynamicConsumer {
461 104
         Map<String, Object> metricData = (Map<String, Object>) weather.get(sysController.getName());
462 105
         if (metricData == null) return;
463 106
 
464
-        // ✅ 使用标准格式:DSB:controllerId:metricName
465 107
         String redisKey = "DSB:" + controllerId + ":" + sysController.getName();
466 108
 
467
-        // 准备写入的数据(包含系统字段)
468 109
         Map<String, String> hashData = new HashMap<>();
469 110
         hashData.put("createTime", createTime);
470 111
         hashData.put("timestamp", getStringFromMap(weather, "timestamp"));
471
-        hashData.put("device_id", getStringFromMap(weather, "device_id")); // UUID
112
+        hashData.put("device_id", getStringFromMap(weather, "device_id"));
472 113
 
473 114
         for (Map.Entry<String, Object> entry : metricData.entrySet()) {
474 115
             if (entry.getValue() != null) {
@@ -479,7 +120,6 @@ public class MqttDynamicConsumer {
479 120
         stringRedisTemplate.opsForHash().putAll(redisKey, hashData);
480 121
         stringRedisTemplate.expire(redisKey, 2, TimeUnit.HOURS);
481 122
 
482
-        // 记录到活跃集合
483 123
         stringRedisTemplate.opsForSet().add("DSB:active:devices", redisKey);
484 124
         stringRedisTemplate.expire("DSB:active:devices", 2, TimeUnit.HOURS);
485 125
     }
@@ -488,51 +128,4 @@ public class MqttDynamicConsumer {
488 128
         Object val = map.get(key);
489 129
         return val == null ? "" : val.toString().trim();
490 130
     }
491
-
492
-
493
-
494
-    private Map<String, Object> deepCopyMap(Map<String, Object> original) {
495
-        if (original == null) return new HashMap<>();
496
-        try {
497
-            String json = objectMapper.writeValueAsString(original);
498
-            return objectMapper.readValue(json, new TypeReference<Map<String, Object>>() {});
499
-        } catch (Exception e) {
500
-            Map<String, Object> copy = new HashMap<>(original.size());
501
-            original.forEach((k, v) -> {
502
-                if (v instanceof Map) {
503
-                    copy.put(k, deepCopyMap((Map<String, Object>) v));
504
-                } else {
505
-                    copy.put(k, v);
506
-                }
507
-            });
508
-            return copy;
509
-        }
510
-    }
511
-
512
-    public void disconnect() {
513
-        synchronized (lock) {
514
-            if (mqttClient != null) {
515
-                try {
516
-                    if (mqttClient.isConnected()) {
517
-                        unsubscribeAll();
518
-                        mqttClient.disconnect(10000);
519
-                    }
520
-                    mqttClient.close();
521
-                    log.info("MQTT 连接已断开");
522
-                } catch (MqttException e) {
523
-                    log.error("断开 MQTT 连接失败:" + e.getMessage());
524
-                } finally {
525
-                    isConnected.set(false);
526
-                    mqttClient = null;
527
-                }
528
-            }
529
-            if (tdengineService != null) tdengineService.close();
530
-        }
531
-    }
532
-
533
-    @PreDestroy
534
-    public void destroy() {
535
-        log.info(">>> 服务正在关闭...");
536
-        disconnect();
537
-    }
538
-}
131
+}

+ 1
- 0
iot-platform/src/main/resources/application.yml Прегледај датотеку

@@ -68,6 +68,7 @@ iot:
68 68
     broker-url: tcp://47.104.204.180:1883
69 69
     username: ${MQTT_USERNAME:}
70 70
     password: ${MQTT_PASSWORD:}
71
+    charge-station-topic: ${MQTT_CHARGE_STATION_TOPIC:station/ChargeStation/device/+/post/json}
71 72
   tdengine:
72 73
     url: jdbc:TAOS://localhost:6030/
73 74
     username: ${TDENGINE_USERNAME:}

Loading…
Откажи
Сачувај