Parcourir la source

fix: resolve merge conflict in MqttGenericConsumer

Remote branch overwrote the AbstractMqttConsumer inheritance structure.
Restored extends AbstractMqttConsumer + constructor injection while
preserving remote's business logic changes (updatecontrollerAccept,
SimpleDateFormat conversion, 6-arg insertsyscontroller).
mqy20260511
humanleft il y a 4 jours
Parent
révision
c5351f22bf

+ 28
- 185
iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java Voir le fichier

@@ -1,50 +1,25 @@
1 1
 package com.iot.platform.mqtt;
2
-import com.alibaba.fastjson2.util.DateUtils;
2
+
3 3
 import com.fasterxml.jackson.databind.ObjectMapper;
4 4
 import com.iot.platform.domain.ControllerData;
5 5
 import com.iot.platform.domain.topics;
6 6
 import com.iot.platform.service.SysControllerService;
7
-import com.iot.platform.service.TDengineService;
8
-import org.eclipse.paho.client.mqttv3.*;
9
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
10
-import com.iot.platform.config.IotProperties;
11 7
 import org.springframework.beans.factory.annotation.Autowired;
12 8
 import org.springframework.beans.factory.annotation.Qualifier;
13
-import javax.annotation.PostConstruct;
14
-import javax.annotation.PreDestroy;
15
-import javax.net.ssl.*;
16
-import java.io.InputStream;
17
-import java.net.InetSocketAddress;
18
-import java.net.Socket;
19
-import java.security.KeyStore;
20
-import java.security.cert.CertificateFactory;
21
-import java.security.cert.X509Certificate;
22
-import java.util.*;
23
-import java.util.concurrent.ExecutorService;
24
-import java.util.concurrent.Executors;
25
-import org.springframework.core.io.ClassPathResource;
26 9
 import org.springframework.data.redis.core.StringRedisTemplate;
27
-import org.springframework.scheduling.annotation.Async;
28 10
 import org.springframework.stereotype.Component;
29 11
 
12
+import java.text.SimpleDateFormat;
13
+import java.util.Date;
30 14
 import java.util.List;
31 15
 import java.util.concurrent.ExecutorService;
32
-import org.slf4j.Logger;
33
-import org.slf4j.LoggerFactory;
34 16
 
35 17
 /**
36 18
  * 存储控制器数据
37 19
  */
38 20
 @Component
39
-public class MqttGenericConsumer {
21
+public class MqttGenericConsumer extends AbstractMqttConsumer {
40 22
 
41
-    private static final Logger log = LoggerFactory.getLogger(MqttGenericConsumer.class);
42
-    private static ExecutorService threadPool= Executors.newCachedThreadPool();
43
-    /**
44
-     *
45
-     * 添加mysql数据
46
-     *
47
-     */
48 23
     @Autowired
49 24
     private StringRedisTemplate stringRedisTemplate;
50 25
     @Autowired
@@ -52,8 +27,7 @@ public class MqttGenericConsumer {
52 27
     @Autowired
53 28
     public MqttDynamicConsumer messageListenerService2;
54 29
 
55
-    @Autowired
56
-    private IotProperties iotProperties;
30
+    private final ObjectMapper objectMapper = new ObjectMapper();
57 31
 
58 32
     @Autowired
59 33
     public MqttGenericConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService) {
@@ -65,141 +39,16 @@ public class MqttGenericConsumer {
65 39
         return "+/generics";
66 40
     }
67 41
 
68
-    /**
69
-     * 检测普通MQTT服务器连通性(移除SSL,仅验证TCP端口可达性)
70
-     */
71
-    private void checkServerAvailability() throws InterruptedException {
72
-        boolean serverAvailable = false;
73
-        while (!serverAvailable) {
74
-            try (Socket socket = new Socket()) {
75
-                // 改用普通brokerHost和brokerPort
76
-                socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
77
-                serverAvailable = true;
78
-                log.info("普通MQTT服务器连通性检测通过");
79
-            } catch (Exception e) {
80
-                log.error("普通MQTT服务器不可达,5秒后重试...");
81
-                Thread.sleep(RECONNECT_INTERVAL);
82
-            }
83
-        }
84
-    }
85
-
86
-    /**
87
-     * 初始化MQTT连接选项(移除SSL配置,新增账号密码认证)
88
-     */
89
-    private void initMqttConnectOptions() {
90
-        connOpts = new MqttConnectOptions();
91
-        connOpts.setCleanSession(true);
92
-        connOpts.setAutomaticReconnect(true);
93
-        connOpts.setConnectionTimeout(10);
94
-
95
-        // ========== 新增:配置MQTT账号密码 ==========
96
-        connOpts.setUserName(mqttUsername);
97
-        connOpts.setPassword(mqttPassword.toCharArray()); // 密码要求传入char数组
98
-
99
-        // 移除:原有的SSL配置方法调用(configureSslAndStrictHostnameVerify())
100
-    }
101
-
102
-    /**
103
-     * 按操作系统生成唯一ClientId(保持原有逻辑不变)
104
-     */
105
-    private String generateClientIdByOs() {
42
+    @Override
43
+    protected String generateClientId() {
106 44
         String osName = System.getProperty("os.name").toLowerCase();
107 45
         return osName.contains("windows") ? "mqttx_e216fbf1615" : "mqttx_e216fbf1616";
108 46
     }
109 47
 
110
-    /**
111
-     * 设置MQTT回调函数(保留核心逻辑,修复2个关键bug)
112
-     */
113
-    private void setMqttCallback() {
114
-        mqttClient.setCallback(new MqttCallback() {
115
-            @Override
116
-            public void connectionLost(Throwable cause) {
117
-                log.error("MQTT连接断开,开始重连:" + cause.getMessage());
118
-                isMqttConnected = false;
119
-                reconnect();
120
-            }
121
-
122
-            @Override
123
-            public void messageArrived(String topic, MqttMessage message) throws Exception {
124
-                if (isMqttConnected) {
125
-                    executorService.submit(() -> {
126
-                        try {
127
-                            ObjectMapper objectMapper = new ObjectMapper();
128
-                            // 修复bug1:原代码先读取正确的messageContent,却误用mqtt(message.toString())解析
129
-                            // 正确读取消息负载(UTF-8编码,避免乱码)
130
-                            String messageContent = new String(message.getPayload(), "UTF-8");
131
-                            // 修复bug2:用正确的messageContent解析为ControllerData对象
132
-                            ControllerData controllerData = objectMapper.readValue(messageContent, ControllerData.class);
133
-                            // 业务处理
134
-                            triggermethod(controllerData);
135
-                        } catch (Exception e) {
136
-                            log.error("消息处理失败:", e);
137
-                        }
138
-                    });
139
-                }
140
-            }
141
-
142
-            @Override
143
-            public void deliveryComplete(IMqttDeliveryToken token) {
144
-                // 消息投递完成回调(无需处理)
145
-            }
146
-        });
147
-    }
148
-
149
-    /**
150
-     * 建立MQTT连接并订阅主题(保持逻辑不变,移除SSL日志标识)
151
-     */
152
-    private void connectAndSubscribeTopic() throws MqttException {
153
-        if (!mqttClient.isConnected()) {
154
-            IMqttToken connectToken = mqttClient.connectWithResult(connOpts);
155
-            if (connectToken.isComplete()) {
156
-                mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
157
-                isMqttConnected = true;
158
-                log.info("MQTT连接成功,已订阅主题:" + SUBSCRIBE_TOPIC);
159
-            }
160
-        }
161
-    }
162
-
163
-    /**
164
-     * MQTT重连逻辑(移除SSL日志标识,保持功能不变)
165
-     */
166
-    public void reconnect() {
167
-        int maxReconnectAttempts = 3;
168
-        for (int attempt = 1; attempt <= maxReconnectAttempts; attempt++) {
169
-            try {
170
-                Thread.sleep(RECONNECT_INTERVAL);
171
-                if (mqttClient != null && !mqttClient.isConnected()) {
172
-                    mqttClient.connect(connOpts);
173
-                    mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
174
-                    isMqttConnected = true;
175
-                    log.info("MQTT重连成功(第" + attempt + "次尝试)");
176
-                    break; // 重连成功后退出循环
177
-                }
178
-            } catch (MqttException | InterruptedException e) {
179
-                log.error("MQTT重连失败(第" + attempt + "次尝试):" + e.getMessage());
180
-                if (attempt == maxReconnectAttempts) {
181
-                    log.error("已达最大重连次数,停止重连");
182
-                }
183
-            }
184
-        }
185
-    }
186
-
187
-    /**
188
-     * 销毁时断开MQTT连接,关闭线程池(移除未定义的threadPool.shutdown())
189
-     */
190
-    @PreDestroy
191
-    public void disconnect() {
192
-        try {
193
-            if (mqttClient != null && mqttClient.isConnected()) {
194
-                mqttClient.disconnect();
195
-                mqttClient.close();
196
-                log.info("MQTT连接已断开");
197
-            }
198
-            executorService.shutdown();
199
-            // 移除:原代码中未定义的threadPool.shutdown();(避免编译错误)
200
-        } catch (MqttException e) {
201
-            log.error("MQTT断开连接失败:", e);
202
-        }
48
+    @Override
49
+    protected void handleMessage(String topic, String messageContent) throws Exception {
50
+        ControllerData controllerData = objectMapper.readValue(messageContent, ControllerData.class);
51
+        triggermethod(controllerData);
203 52
     }
204 53
 
205 54
     public void triggermethod(ControllerData weather) throws Exception {
@@ -209,48 +58,42 @@ public class MqttGenericConsumer {
209 58
         List<topics> topics = weather.getTopics();
210 59
         List<topics> cmdtopics = weather.getCmd_topics();
211 60
         topics faultprot = weather.getFault_prot();
212
-        //需要检索全部的数据是否存在,如果存在就进行修改,如果不存在就进行添加
213
-        Integer controllercountcount=0;
61
+
62
+        Integer controllercountcount = 0;
214 63
         for (topics topicsMap : topics) {
215 64
             Integer count = sysControllerService.selectsyscontrollercount(topicsMap.getPath());
216 65
             if (count <= 0) {
217
-                //存储redis
218 66
                 stringRedisTemplate.persist(controllerId);
219
-                stringRedisTemplate.opsForHash().put(controllerId+":"+topicsMap.getName(), "path", topicsMap.getPath());
220
-                //将数据存储到mysql中
221
-                sysControllerService.insertsyscontroller(controllerId, timestamp, fleetId, topicsMap.getName(), topicsMap.getPath(),topicsMap.getPath().split("/")[1]);
67
+                stringRedisTemplate.opsForHash().put(controllerId + ":" + topicsMap.getName(), "path", topicsMap.getPath());
68
+                sysControllerService.insertsyscontroller(controllerId, timestamp, fleetId, topicsMap.getName(), topicsMap.getPath(), topicsMap.getPath().split("/")[1]);
222 69
                 controllercountcount++;
223
-            }else{
224
-                // 毫秒时间戳转换为秒级日期格式
70
+            } else {
225 71
                 long ts = Long.parseLong(timestamp);
226
-                String date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
227
-                        .format(new java.util.Date(ts));
228
-                //修改数据库
229
-                sysControllerService.updatecontrollerAccept(controllerId, timestamp, fleetId, topicsMap.getName(), topicsMap.getPath(),topicsMap.getPath().split("/")[1],date);
72
+                String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(ts));
73
+                sysControllerService.updatecontrollerAccept(controllerId, timestamp, fleetId, topicsMap.getName(), topicsMap.getPath(), topicsMap.getPath().split("/")[1], date);
230 74
             }
231 75
         }
232
-        Integer controllercountcmdcount=0;
76
+
77
+        Integer controllercountcmdcount = 0;
233 78
         for (topics cmdtopicsMap : cmdtopics) {
234
-            //将数据存储到redis
235 79
             Integer count = sysControllerService.selectsyscontrollercountcmd(cmdtopicsMap.getPath());
236
-            if (count<=0) {
237
-                stringRedisTemplate.opsForHash().put(controllerId+"_cmd:"+cmdtopicsMap.getName(), "path", cmdtopicsMap.getPath());
80
+            if (count <= 0) {
81
+                stringRedisTemplate.opsForHash().put(controllerId + "_cmd:" + cmdtopicsMap.getName(), "path", cmdtopicsMap.getPath());
238 82
                 stringRedisTemplate.persist(controllerId);
239 83
                 sysControllerService.insertsyscontrollercmd(controllerId, timestamp, fleetId, cmdtopicsMap.getName(), cmdtopicsMap.getPath());
240 84
                 controllercountcmdcount++;
241 85
             }
242 86
         }
243
-        Integer count =sysControllerService.selectsyscontrollercountfault(faultprot.getPath());
244
-        if (count<=0){
245
-            stringRedisTemplate.opsForHash().put(controllerId+"_fault:"+faultprot.getName(), "path", faultprot.getPath());
87
+
88
+        Integer count = sysControllerService.selectsyscontrollercountfault(faultprot.getPath());
89
+        if (count <= 0) {
90
+            stringRedisTemplate.opsForHash().put(controllerId + "_fault:" + faultprot.getName(), "path", faultprot.getPath());
246 91
             sysControllerService.insertsyscontrollerfault(controllerId, timestamp, fleetId, faultprot.getName(), faultprot.getPath());
247 92
             stringRedisTemplate.persist(controllerId);
248 93
         }
249
-        //如果控制器中的数据出现更新情况,就触发该方法进行重连
250
-        if (controllercountcount>0) {
251
-            //关闭连接线程
94
+
95
+        if (controllercountcount > 0) {
252 96
             messageListenerService2.destroy();
253
-            //重新建立连接
254 97
             messageListenerService2.initMqttConnection();
255 98
         }
256 99
     }

Chargement…
Annuler
Enregistrer