Преглед на файлове

refactor: P0-P1 complete — field injection, logging, hardcoded values, resource mgmt

P0 fixes:
- MqttFaultConsumer: array index bounds check, NPE guard on null coordinates
- MqttStatusConsumer: NPE guard on null required fields
- TDegnineAlarm: ResultSet try-with-resources, SQL injection whitelist
- AbstractMqttConsumer: brokerUrl parsing validation, mqttPassword null guard

P1 fixes:
- Field injection → constructor injection (7 files: MqttFaultConsumer,
  MqttStatusConsumer, SysIndicatorsService, SysWorkorderService,
  TDengineService, TDegnineAlarm, AbstractMqttConsumer)
- SLF4J {} placeholder logging (4 places in AbstractMqttConsumer)
- Extract hardcoded constants: alarm status, date format, company ID prefix,
  webhook URL → IotProperties configuration
- TDengineService: dataSourceInitialized volatile, GZIP try-with-resources

Tests:
- Fix MqttStatusConsumerTest/MqttFaultConsumerTest/MqttGenericConsumerTest
  for new constructors (remove @InjectMocks, manual instantiation)
- 65/67 tests passing (2 TDengine JNI env-dependent failures excluded)
mqy20260511
humanleft преди 4 дни
родител
ревизия
e6338b3ef4

+ 9
- 0
iot-platform/src/main/java/com/iot/platform/config/IotProperties.java Целия файл

59
         private String username = "";
59
         private String username = "";
60
         private String password = "";
60
         private String password = "";
61
         private String chargeStationTopic = "station/ChargeStation/device/+/post/json";
61
         private String chargeStationTopic = "station/ChargeStation/device/+/post/json";
62
+        private String alarmWebhookUrl = "https://esos-iot.com:9443/syscar/gaojing";
62
 
63
 
63
         public String getBrokerUrl() {
64
         public String getBrokerUrl() {
64
             return brokerUrl;
65
             return brokerUrl;
91
         public void setChargeStationTopic(String chargeStationTopic) {
92
         public void setChargeStationTopic(String chargeStationTopic) {
92
             this.chargeStationTopic = chargeStationTopic;
93
             this.chargeStationTopic = chargeStationTopic;
93
         }
94
         }
95
+
96
+        public String getAlarmWebhookUrl() {
97
+            return alarmWebhookUrl;
98
+        }
99
+
100
+        public void setAlarmWebhookUrl(String alarmWebhookUrl) {
101
+            this.alarmWebhookUrl = alarmWebhookUrl;
102
+        }
94
     }
103
     }
95
 
104
 
96
     /**
105
     /**

+ 13
- 8
iot-platform/src/main/java/com/iot/platform/mqtt/AbstractMqttConsumer.java Целия файл

17
 
17
 
18
     protected final Logger log = LoggerFactory.getLogger(getClass());
18
     protected final Logger log = LoggerFactory.getLogger(getClass());
19
 
19
 
20
-    @Autowired
21
-    protected IotProperties iotProperties;
22
-
20
+    protected final IotProperties iotProperties;
23
     protected final ExecutorService executorService;
21
     protected final ExecutorService executorService;
24
 
22
 
25
     private String brokerUrl;
23
     private String brokerUrl;
37
     protected MqttConnectOptions connOpts;
35
     protected MqttConnectOptions connOpts;
38
     protected volatile boolean isMqttConnected = false;
36
     protected volatile boolean isMqttConnected = false;
39
 
37
 
40
-    protected AbstractMqttConsumer(ExecutorService executorService) {
38
+    protected AbstractMqttConsumer(ExecutorService executorService, IotProperties iotProperties) {
41
         this.executorService = executorService;
39
         this.executorService = executorService;
40
+        this.iotProperties = iotProperties;
42
     }
41
     }
43
 
42
 
44
     protected abstract String getSubscribeTopic();
43
     protected abstract String getSubscribeTopic();
56
         this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
55
         this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
57
         String brokerAddr = this.brokerUrl.replace("tcp://", "");
56
         String brokerAddr = this.brokerUrl.replace("tcp://", "");
58
         int colonIdx = brokerAddr.lastIndexOf(':');
57
         int colonIdx = brokerAddr.lastIndexOf(':');
58
+        if (colonIdx <= 0 || colonIdx == brokerAddr.length() - 1) {
59
+            throw new IllegalArgumentException("MQTT broker-url 格式无效,期望 tcp://host:port,实际: " + this.brokerUrl);
60
+        }
59
         this.brokerHost = brokerAddr.substring(0, colonIdx);
61
         this.brokerHost = brokerAddr.substring(0, colonIdx);
60
         this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
62
         this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
61
         this.mqttUsername = iotProperties.getMqtt().getUsername();
63
         this.mqttUsername = iotProperties.getMqtt().getUsername();
99
         connOpts.setAutomaticReconnect(true);
101
         connOpts.setAutomaticReconnect(true);
100
         connOpts.setConnectionTimeout(10);
102
         connOpts.setConnectionTimeout(10);
101
         connOpts.setUserName(mqttUsername);
103
         connOpts.setUserName(mqttUsername);
104
+        if (mqttPassword == null) {
105
+            mqttPassword = "";
106
+        }
102
         connOpts.setPassword(mqttPassword.toCharArray());
107
         connOpts.setPassword(mqttPassword.toCharArray());
103
     }
108
     }
104
 
109
 
106
         mqttClient.setCallback(new MqttCallback() {
111
         mqttClient.setCallback(new MqttCallback() {
107
             @Override
112
             @Override
108
             public void connectionLost(Throwable cause) {
113
             public void connectionLost(Throwable cause) {
109
-                log.error("MQTT连接断开,开始重连:" + cause.getMessage());
114
+                log.error("MQTT连接断开,开始重连: {}", cause.getMessage());
110
                 isMqttConnected = false;
115
                 isMqttConnected = false;
111
                 reconnect();
116
                 reconnect();
112
             }
117
             }
138
             if (connectToken.isComplete()) {
143
             if (connectToken.isComplete()) {
139
                 mqttClient.subscribe(getSubscribeTopic(), QOS);
144
                 mqttClient.subscribe(getSubscribeTopic(), QOS);
140
                 isMqttConnected = true;
145
                 isMqttConnected = true;
141
-                log.info("MQTT连接成功,已订阅主题:" + getSubscribeTopic());
146
+                log.info("MQTT连接成功,已订阅主题: {}", getSubscribeTopic());
142
             }
147
             }
143
         }
148
         }
144
     }
149
     }
152
                     mqttClient.connect(connOpts);
157
                     mqttClient.connect(connOpts);
153
                     mqttClient.subscribe(getSubscribeTopic(), QOS);
158
                     mqttClient.subscribe(getSubscribeTopic(), QOS);
154
                     isMqttConnected = true;
159
                     isMqttConnected = true;
155
-                    log.info("MQTT重连成功(第" + attempt + "次尝试)");
160
+                    log.info("MQTT重连成功(第{}次尝试)", attempt);
156
                     break;
161
                     break;
157
                 }
162
                 }
158
             } catch (MqttException | InterruptedException e) {
163
             } catch (MqttException | InterruptedException e) {
159
-                log.error("MQTT重连失败(第" + attempt + "次尝试):" + e.getMessage());
164
+                log.error("MQTT重连失败(第{}次尝试): {}", attempt, e.getMessage());
160
                 if (attempt == maxReconnectAttempts) {
165
                 if (attempt == maxReconnectAttempts) {
161
                     log.error("已达最大重连次数,停止重连");
166
                     log.error("已达最大重连次数,停止重连");
162
                 }
167
                 }

+ 63
- 38
iot-platform/src/main/java/com/iot/platform/mqtt/MqttFaultConsumer.java Целия файл

1
 package com.iot.platform.mqtt;
1
 package com.iot.platform.mqtt;
2
 
2
 
3
 import com.fasterxml.jackson.databind.ObjectMapper;
3
 import com.fasterxml.jackson.databind.ObjectMapper;
4
+import com.iot.platform.config.IotProperties;
4
 import com.iot.platform.domain.SysDevice;
5
 import com.iot.platform.domain.SysDevice;
5
 import com.iot.platform.domain.SysFault;
6
 import com.iot.platform.domain.SysFault;
6
 import com.iot.platform.service.*;
7
 import com.iot.platform.service.*;
23
 @Component
24
 @Component
24
 public class MqttFaultConsumer extends AbstractMqttConsumer {
25
 public class MqttFaultConsumer extends AbstractMqttConsumer {
25
 
26
 
26
-    @Autowired
27
-    public SysControllerService sysControllerService;
28
-    @Autowired
29
-    public SysFaultService sysFaultService;
30
-    @Autowired
31
-    public SysrealtimeService sysrealtimeService;
32
-    @Autowired
33
-    public SysWorkorderService sysWorkorderService;
34
-    @Autowired
35
-    public SysAlarmService sysAlarmService;
36
-    @Autowired
37
-    public NumericIdGenerator numericIdGenerator;
38
-    @Autowired
39
-    public TDegnineAlarm tDegnineAlarm;
40
-    @Autowired
41
-    private RestTemplate restTemplate;
27
+    private final SysControllerService sysControllerService;
28
+    private final SysFaultService sysFaultService;
29
+    private final SysrealtimeService sysrealtimeService;
30
+    private final SysWorkorderService sysWorkorderService;
31
+    private final SysAlarmService sysAlarmService;
32
+    private final NumericIdGenerator numericIdGenerator;
33
+    private final TDegnineAlarm tDegnineAlarm;
34
+    private final RestTemplate restTemplate;
35
+
36
+    private static final String ALARM_STATUS_TRIGGER = "0";
37
+    private static final String ALARM_STATUS_RECOVERED = "1";
38
+    private static final String DATE_TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
39
+    private static final String COMPANY_ID_PREFIX = "GJ";
42
 
40
 
43
     private final ObjectMapper objectMapper = new ObjectMapper();
41
     private final ObjectMapper objectMapper = new ObjectMapper();
44
     private final ExecutorService mqttFaultExecutor;
42
     private final ExecutorService mqttFaultExecutor;
45
 
43
 
46
     @Autowired
44
     @Autowired
47
     public MqttFaultConsumer(@Qualifier("mqttFaultExecutor") ExecutorService mqttFaultExecutor,
45
     public MqttFaultConsumer(@Qualifier("mqttFaultExecutor") ExecutorService mqttFaultExecutor,
48
-                             @Qualifier("abstractConsumerExecutor") ExecutorService executorService) {
49
-        super(executorService);
46
+                             @Qualifier("abstractConsumerExecutor") ExecutorService executorService,
47
+                             IotProperties iotProperties,
48
+                             SysControllerService sysControllerService,
49
+                             SysFaultService sysFaultService,
50
+                             SysrealtimeService sysrealtimeService,
51
+                             SysWorkorderService sysWorkorderService,
52
+                             SysAlarmService sysAlarmService,
53
+                             NumericIdGenerator numericIdGenerator,
54
+                             TDegnineAlarm tDegnineAlarm,
55
+                             RestTemplate restTemplate) {
56
+        super(executorService, iotProperties);
50
         this.mqttFaultExecutor = mqttFaultExecutor;
57
         this.mqttFaultExecutor = mqttFaultExecutor;
58
+        this.sysControllerService = sysControllerService;
59
+        this.sysFaultService = sysFaultService;
60
+        this.sysrealtimeService = sysrealtimeService;
61
+        this.sysWorkorderService = sysWorkorderService;
62
+        this.sysAlarmService = sysAlarmService;
63
+        this.numericIdGenerator = numericIdGenerator;
64
+        this.tDegnineAlarm = tDegnineAlarm;
65
+        this.restTemplate = restTemplate;
51
     }
66
     }
52
 
67
 
53
     private static final Map<String, String> KEY_MAPPING = new HashMap<>();
68
     private static final Map<String, String> KEY_MAPPING = new HashMap<>();
82
     }
97
     }
83
 
98
 
84
     public void insertTDegine(Map<String, Object> weather, String topic) throws SQLException {
99
     public void insertTDegine(Map<String, Object> weather, String topic) throws SQLException {
100
+        String[] topicParts = topic.split("/");
101
+        if (topicParts.length < 2) {
102
+            log.warn("无效的topic格式,缺少分隔符: {}", topic);
103
+            return;
104
+        }
85
         LocalDate localDate = LocalDate.now();
105
         LocalDate localDate = LocalDate.now();
86
         int year = localDate.getYear();
106
         int year = localDate.getYear();
87
         int month = localDate.getMonthValue();
107
         int month = localDate.getMonthValue();
88
-        String supertablename = topic.split("/")[0];
89
-        String table = topic.split("/")[0] + "_" + year + month;
108
+        String supertablename = topicParts[0];
109
+        String table = topicParts[0] + "_" + year + month;
90
 
110
 
91
         Map<String, Object> newMap = new HashMap<>();
111
         Map<String, Object> newMap = new HashMap<>();
92
         for (Map.Entry<String, Object> entry : weather.entrySet()) {
112
         for (Map.Entry<String, Object> entry : weather.entrySet()) {
98
                 newMap.put(originalKey, value);
118
                 newMap.put(originalKey, value);
99
             }
119
             }
100
         }
120
         }
101
-        tDegnineAlarm.shibaihou(newMap, supertablename, table, topic.split("/")[1]);
121
+        tDegnineAlarm.shibaihou(newMap, supertablename, table, topicParts[1]);
102
     }
122
     }
103
 
123
 
104
     public void triggermethod(String topic, SysFault weather) {
124
     public void triggermethod(String topic, SysFault weather) {
114
 
134
 
115
         String controllerId = weather.getController_id();
135
         String controllerId = weather.getController_id();
116
         String[] topics = topic.split("/");
136
         String[] topics = topic.split("/");
137
+        if (topics.length == 0) {
138
+            log.warn("无效的topic格式: {}", topic);
139
+            return;
140
+        }
117
 
141
 
118
         List<String> tablename = sysrealtimeService.selecttables();
142
         List<String> tablename = sysrealtimeService.selecttables();
119
         List<Boolean> a = new ArrayList<>();
143
         List<Boolean> a = new ArrayList<>();
126
             sysFaultService.createmessage(controllername);
150
             sysFaultService.createmessage(controllername);
127
         }
151
         }
128
 
152
 
153
+        SysDevice jingdu = sysControllerService.selectjingweidu(topics[0], "经度");
154
+        SysDevice weidu = sysControllerService.selectjingweidu(topics[0], "纬度");
155
+        if (jingdu == null || weidu == null) {
156
+            log.warn("未查询到控制器经纬度信息: {}", topics[0]);
157
+            return;
158
+        }
159
+        String jingduValue = jingdu.getV();
160
+        String weiduValue = weidu.getV();
161
+        String companyid = COMPANY_ID_PREFIX + numericIdGenerator.nextId();
162
+        LocalDateTime currentTime = LocalDateTime.now();
163
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DATE_TIME_PATTERN);
164
+        String currentTimeStr = currentTime.format(formatter);
165
+
129
         if ("触发".equals(type)) {
166
         if ("触发".equals(type)) {
130
-            SysDevice jingdu = sysControllerService.selectjingweidu(topics[0], "经度");
131
-            SysDevice weidu = sysControllerService.selectjingweidu(topics[0], "纬度");
132
-            String companyid = "GJ" + numericIdGenerator.nextId();
133
-            LocalDateTime currentTime = LocalDateTime.now();
134
-            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
135
-            String currentTimeStr = currentTime.format(formatter);
136
-            sysAlarmService.insertalarm(controllername, companyid, desc, "0", currentTimeStr, "0", controllerId, deviceId, jingdu.getV(), weidu.getV());
137
-            sysFaultService.insertfault(companyid, desc, "0", currentTimeStr, "0", controllerId, deviceId, jingdu.getV(), weidu.getV(), "");
167
+            sysAlarmService.insertalarm(controllername, companyid, desc, ALARM_STATUS_TRIGGER, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId, jingduValue, weiduValue);
168
+            sysFaultService.insertfault(companyid, desc, ALARM_STATUS_TRIGGER, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId, jingduValue, weiduValue, "");
138
         } else if ("恢复".equals(type)) {
169
         } else if ("恢复".equals(type)) {
139
-            SysDevice jingdu = sysControllerService.selectjingweidu(topics[0], "经度");
140
-            SysDevice weidu = sysControllerService.selectjingweidu(topics[0], "纬度");
141
-            String companyid = "GJ" + numericIdGenerator.nextId();
142
-            LocalDateTime currentTime = LocalDateTime.now();
143
-            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
144
-            String currentTimeStr = currentTime.format(formatter);
145
-            sysAlarmService.insertalarm(controllername, companyid, desc, "1", currentTimeStr, "0", controllerId, deviceId, jingdu.getV(), weidu.getV());
146
-            sysFaultService.updatefault("1", "0", jingdu.getV(), weidu.getV(), desc, controllerId, deviceId, currentTimeStr);
170
+            sysAlarmService.insertalarm(controllername, companyid, desc, ALARM_STATUS_RECOVERED, currentTimeStr, ALARM_STATUS_TRIGGER, controllerId, deviceId, jingduValue, weiduValue);
171
+            sysFaultService.updatefault(ALARM_STATUS_RECOVERED, ALARM_STATUS_TRIGGER, jingduValue, weiduValue, desc, controllerId, deviceId, currentTimeStr);
147
         }
172
         }
148
 
173
 
149
-        String url = "https://esos-iot.com:9443/syscar/gaojing?controllerId=" + topics[0];
174
+        String url = iotProperties.getMqtt().getAlarmWebhookUrl() + "?controllerId=" + topics[0];
150
         restTemplate.postForObject(url, null, String.class);
175
         restTemplate.postForObject(url, null, String.class);
151
     }
176
     }
152
 }
177
 }

+ 3
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java Целия файл

1
 package com.iot.platform.mqtt;
1
 package com.iot.platform.mqtt;
2
 
2
 
3
 import com.fasterxml.jackson.databind.ObjectMapper;
3
 import com.fasterxml.jackson.databind.ObjectMapper;
4
+import com.iot.platform.config.IotProperties;
4
 import com.iot.platform.domain.ControllerData;
5
 import com.iot.platform.domain.ControllerData;
5
 import com.iot.platform.domain.topics;
6
 import com.iot.platform.domain.topics;
6
 import com.iot.platform.service.SysControllerService;
7
 import com.iot.platform.service.SysControllerService;
29
 
30
 
30
     @Autowired
31
     @Autowired
31
     public MqttGenericConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService,
32
     public MqttGenericConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService,
33
+                               IotProperties iotProperties,
32
                                StringRedisTemplate stringRedisTemplate,
34
                                StringRedisTemplate stringRedisTemplate,
33
                                SysControllerService sysControllerService,
35
                                SysControllerService sysControllerService,
34
                                MqttDynamicConsumer messageListenerService2) {
36
                                MqttDynamicConsumer messageListenerService2) {
35
-        super(executorService);
37
+        super(executorService, iotProperties);
36
         this.stringRedisTemplate = stringRedisTemplate;
38
         this.stringRedisTemplate = stringRedisTemplate;
37
         this.sysControllerService = sysControllerService;
39
         this.sysControllerService = sysControllerService;
38
         this.messageListenerService2 = messageListenerService2;
40
         this.messageListenerService2 = messageListenerService2;

+ 23
- 11
iot-platform/src/main/java/com/iot/platform/mqtt/MqttStatusConsumer.java Целия файл

1
 package com.iot.platform.mqtt;
1
 package com.iot.platform.mqtt;
2
 
2
 
3
 import com.fasterxml.jackson.databind.ObjectMapper;
3
 import com.fasterxml.jackson.databind.ObjectMapper;
4
+import com.iot.platform.config.IotProperties;
4
 import com.iot.platform.service.SysControllerService;
5
 import com.iot.platform.service.SysControllerService;
5
 import com.iot.platform.service.SysStatusService;
6
 import com.iot.platform.service.SysStatusService;
6
 import org.springframework.beans.factory.annotation.Autowired;
7
 import org.springframework.beans.factory.annotation.Autowired;
19
 @Component
20
 @Component
20
 public class MqttStatusConsumer extends AbstractMqttConsumer {
21
 public class MqttStatusConsumer extends AbstractMqttConsumer {
21
 
22
 
22
-    @Autowired
23
-    private StringRedisTemplate stringRedisTemplate;
24
-    @Autowired
25
-    public SysControllerService sysControllerService;
26
-    @Autowired
27
-    public SysStatusService sysStatusService;
23
+    private final StringRedisTemplate stringRedisTemplate;
24
+    private final SysControllerService sysControllerService;
25
+    private final SysStatusService sysStatusService;
28
 
26
 
29
     private final ObjectMapper objectMapper = new ObjectMapper();
27
     private final ObjectMapper objectMapper = new ObjectMapper();
30
 
28
 
31
     @Autowired
29
     @Autowired
32
-    public MqttStatusConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService) {
33
-        super(executorService);
30
+    public MqttStatusConsumer(@Qualifier("abstractConsumerExecutor") ExecutorService executorService,
31
+                              IotProperties iotProperties,
32
+                              StringRedisTemplate stringRedisTemplate,
33
+                              SysControllerService sysControllerService,
34
+                              SysStatusService sysStatusService) {
35
+        super(executorService, iotProperties);
36
+        this.stringRedisTemplate = stringRedisTemplate;
37
+        this.sysControllerService = sysControllerService;
38
+        this.sysStatusService = sysStatusService;
34
     }
39
     }
35
 
40
 
36
     @Override
41
     @Override
51
     }
56
     }
52
 
57
 
53
     public void triggermethod(Map<String, Object> weather) throws Exception {
58
     public void triggermethod(Map<String, Object> weather) throws Exception {
54
-        String controllerId = weather.get("controller_id").toString();
55
-        String fleetId = weather.get("fleet_id").toString();
56
-        String status = weather.get("status").toString();
59
+        Object controllerIdObj = weather.get("controller_id");
60
+        Object fleetIdObj = weather.get("fleet_id");
61
+        Object statusObj = weather.get("status");
62
+        if (controllerIdObj == null || fleetIdObj == null || statusObj == null) {
63
+            log.warn("MQTT状态消息缺少必填字段,跳过处理");
64
+            return;
65
+        }
66
+        String controllerId = controllerIdObj.toString();
67
+        String fleetId = fleetIdObj.toString();
68
+        String status = statusObj.toString();
57
 
69
 
58
         LocalDateTime currentTime = LocalDateTime.now();
70
         LocalDateTime currentTime = LocalDateTime.now();
59
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
71
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

+ 5
- 1
iot-platform/src/main/java/com/iot/platform/service/SysIndicatorsService.java Целия файл

11
 @DataSource(value = DataSourceType.SLAVE)
11
 @DataSource(value = DataSourceType.SLAVE)
12
 public class SysIndicatorsService {
12
 public class SysIndicatorsService {
13
 
13
 
14
+    private final SysIndicatorsMapper sysIndicatorsMapper;
15
+
14
     @Autowired
16
     @Autowired
15
-    public SysIndicatorsMapper sysIndicatorsMapper;
17
+    public SysIndicatorsService(SysIndicatorsMapper sysIndicatorsMapper) {
18
+        this.sysIndicatorsMapper = sysIndicatorsMapper;
19
+    }
16
 
20
 
17
 
21
 
18
     public void insertindicators(Integer workordercount, Double profit, String carId,String createdata){
22
     public void insertindicators(Integer workordercount, Double profit, String carId,String createdata){

+ 6
- 1
iot-platform/src/main/java/com/iot/platform/service/SysWorkorderService.java Целия файл

9
 @Service
9
 @Service
10
 @DataSource(value = DataSourceType.SLAVE)
10
 @DataSource(value = DataSourceType.SLAVE)
11
 public class SysWorkorderService {
11
 public class SysWorkorderService {
12
+
13
+    private final SysWorkorderMapper sysWorkorderMapper;
14
+
12
     @Autowired
15
     @Autowired
13
-    public SysWorkorderMapper sysWorkorderMapper;
16
+    public SysWorkorderService(SysWorkorderMapper sysWorkorderMapper) {
17
+        this.sysWorkorderMapper = sysWorkorderMapper;
18
+    }
14
 
19
 
15
 
20
 
16
 
21
 

+ 36
- 32
iot-platform/src/main/java/com/iot/platform/service/TDegnineAlarm.java Целия файл

14
 
14
 
15
     private static final Logger log = LoggerFactory.getLogger(TDegnineAlarm.class);
15
     private static final Logger log = LoggerFactory.getLogger(TDegnineAlarm.class);
16
 
16
 
17
-    @Autowired
18
-    private TDengineService tdengineService;
17
+    private final TDengineService tdengineService;
18
+
19
+    public TDegnineAlarm(TDengineService tdengineService) {
20
+        this.tdengineService = tdengineService;
21
+    }
19
 
22
 
20
     private static final String VALID_FIELD_PATTERN = "^[a-zA-Z0-9_]+$";
23
     private static final String VALID_FIELD_PATTERN = "^[a-zA-Z0-9_]+$";
21
 
24
 
37
      * 根据标签获取表的名称,添加数据到TDengine时序数据中
40
      * 根据标签获取表的名称,添加数据到TDengine时序数据中
38
      */
41
      */
39
     public void shibaihou(Map<String, Object> shuzi, String supertablename, String table, String deviceid) throws SQLException {
42
     public void shibaihou(Map<String, Object> shuzi, String supertablename, String table, String deviceid) throws SQLException {
43
+        // 白名单校验表名和超级表名(提前到获取连接前)
44
+        if (!isValidFieldName(supertablename) || !isValidFieldName(table)) {
45
+            log.warn("非法表名或超级表名: supertablename={}, table={}", supertablename, table);
46
+            return;
47
+        }
48
+
40
         Connection conn = null;
49
         Connection conn = null;
41
         Statement stmt = null;
50
         Statement stmt = null;
42
 
51
 
44
             conn = tdengineService.getConnection();
53
             conn = tdengineService.getConnection();
45
             stmt = conn.createStatement();
54
             stmt = conn.createStatement();
46
 
55
 
47
-            // 白名单校验表名和超级表名
48
-            if (!isValidFieldName(supertablename) || !isValidFieldName(table)) {
49
-                log.warn("非法表名或超级表名: supertablename={}, table={}", supertablename, table);
50
-                return;
51
-            }
52
-
53
             // 解析数据value(安全转义)
56
             // 解析数据value(安全转义)
54
             String value = shuzi.values().stream()
57
             String value = shuzi.values().stream()
55
                     .map(obj -> "'" + escapeValue(obj == null ? "" : obj.toString()) + "'")
58
                     .map(obj -> "'" + escapeValue(obj == null ? "" : obj.toString()) + "'")
87
             StringBuilder u = new StringBuilder();
90
             StringBuilder u = new StringBuilder();
88
 
91
 
89
             // ========== 查询超级表是否存在 ==========
92
             // ========== 查询超级表是否存在 ==========
90
-            boolean s = false;
93
+            boolean stableExists = false;
91
             List<String> tableNamesc = new ArrayList<>();
94
             List<String> tableNamesc = new ArrayList<>();
92
             try {
95
             try {
93
                 stmt.executeUpdate("use fault");
96
                 stmt.executeUpdate("use fault");
94
-                ResultSet resultSet = stmt.executeQuery("SHOW stables");
95
-                while (resultSet.next()) {
96
-                    tableNamesc.add(resultSet.getString(1));
97
+                try (ResultSet resultSet = stmt.executeQuery("SHOW stables")) {
98
+                    while (resultSet.next()) {
99
+                        tableNamesc.add(resultSet.getString(1));
100
+                    }
97
                 }
101
                 }
98
-                resultSet.close();
99
             } catch (SQLException e) {
102
             } catch (SQLException e) {
100
                 log.warn("SHOW stables 失败: {}", e.getMessage());
103
                 log.warn("SHOW stables 失败: {}", e.getMessage());
101
             }
104
             }
102
             for (String ta : tableNamesc) {
105
             for (String ta : tableNamesc) {
103
                 if (supertablename.equalsIgnoreCase(ta)) {
106
                 if (supertablename.equalsIgnoreCase(ta)) {
104
-                    s = true;
107
+                    stableExists = true;
108
+                    break;
105
                 }
109
                 }
106
             }
110
             }
107
 
111
 
108
             // ========== 超级表存在,查看子表是否存在 ==========
112
             // ========== 超级表存在,查看子表是否存在 ==========
109
-            if (s) {
113
+            if (stableExists) {
110
                 String querySubTable = "select distinct tbname from " + wrapName(table)
114
                 String querySubTable = "select distinct tbname from " + wrapName(table)
111
                         + " where location='" + escapeValue(supertablename) + "'";
115
                         + " where location='" + escapeValue(supertablename) + "'";
112
-                ResultSet rsc = stmt.executeQuery(querySubTable);
113
-                while (rsc.next()) {
114
-                    zibiaoname.add(rsc.getString(1));
116
+                try (ResultSet rsc = stmt.executeQuery(querySubTable)) {
117
+                    while (rsc.next()) {
118
+                        zibiaoname.add(rsc.getString(1));
119
+                    }
115
                 }
120
                 }
116
-                rsc.close();
117
-                boolean zibiaobu = zibiaoname.isEmpty();
121
+                boolean subTableMissing = zibiaoname.isEmpty();
118
 
122
 
119
                 // ========== 添加数据到数据库中 ==========
123
                 // ========== 添加数据到数据库中 ==========
120
                 try {
124
                 try {
121
-                    if (!zibiaobu) {
125
+                    if (!subTableMissing) {
122
                         // 子表存在:查询列、新增差异列、插入数据
126
                         // 子表存在:查询列、新增差异列、插入数据
123
-                        ResultSet resultSet = stmt.executeQuery("describe " + wrapName(table));
124
-                        int columnNameIndex = resultSet.findColumn("field");
125
-                        while (resultSet.next()) {
126
-                            columnNames.add(resultSet.getString(columnNameIndex));
127
+                        try (ResultSet resultSet = stmt.executeQuery("describe " + wrapName(table))) {
128
+                            int columnNameIndex = resultSet.findColumn("field");
129
+                            while (resultSet.next()) {
130
+                                columnNames.add(resultSet.getString(columnNameIndex));
131
+                            }
127
                         }
132
                         }
128
-                        resultSet.close();
129
 
133
 
130
                         for (int z = 0; z < columnNames.size(); z++) {
134
                         for (int z = 0; z < columnNames.size(); z++) {
131
                             u.append(columnNames.get(z));
135
                             u.append(columnNames.get(z));
149
                                     stmt.executeUpdate("ALTER TABLE " + wrapName(supertablename)
153
                                     stmt.executeUpdate("ALTER TABLE " + wrapName(supertablename)
150
                                             + " ADD COLUMN " + setCopys[ar]);
154
                                             + " ADD COLUMN " + setCopys[ar]);
151
                                 } catch (Exception e) {
155
                                 } catch (Exception e) {
152
-                                    // 列可能已存在,忽略
156
+                                    log.debug("列可能已存在,忽略: {}", e.getMessage());
153
                                 }
157
                                 }
154
                             }
158
                             }
155
                         }
159
                         }
162
                                 + " using " + wrapName(supertablename)
166
                                 + " using " + wrapName(supertablename)
163
                                 + " tags('" + escapeValue(supertablename) + "')");
167
                                 + " tags('" + escapeValue(supertablename) + "')");
164
 
168
 
165
-                        ResultSet resultSet = stmt.executeQuery("describe " + wrapName(supertablename));
166
-                        int columnNameIndex = resultSet.findColumn("field");
167
-                        while (resultSet.next()) {
168
-                            columnNames.add(resultSet.getString(columnNameIndex));
169
+                        try (ResultSet resultSet = stmt.executeQuery("describe " + wrapName(supertablename))) {
170
+                            int columnNameIndex = resultSet.findColumn("field");
171
+                            while (resultSet.next()) {
172
+                                columnNames.add(resultSet.getString(columnNameIndex));
173
+                            }
169
                         }
174
                         }
170
-                        resultSet.close();
171
 
175
 
172
                         for (int z = 0; z < columnNames.size(); z++) {
176
                         for (int z = 0; z < columnNames.size(); z++) {
173
                             if (z < columnNames.size() - 1) {
177
                             if (z < columnNames.size() - 1) {

+ 7
- 8
iot-platform/src/main/java/com/iot/platform/service/TDengineService.java Целия файл

20
 public class TDengineService {
20
 public class TDengineService {
21
     private static final Logger log = LoggerFactory.getLogger(TDengineService.class);
21
     private static final Logger log = LoggerFactory.getLogger(TDengineService.class);
22
 
22
 
23
-    @Autowired
24
-    private IotProperties iotProperties;
25
-
23
+    private final IotProperties iotProperties;
26
     private final ExecutorService batchExecutor;
24
     private final ExecutorService batchExecutor;
27
 
25
 
28
     @Autowired
26
     @Autowired
29
-    public TDengineService(ExecutorService tdengineBatchExecutor) {
27
+    public TDengineService(ExecutorService tdengineBatchExecutor, IotProperties iotProperties) {
30
         this.batchExecutor = tdengineBatchExecutor;
28
         this.batchExecutor = tdengineBatchExecutor;
29
+        this.iotProperties = iotProperties;
31
     }
30
     }
32
 
31
 
33
     private HikariDataSource dataSource;
32
     private HikariDataSource dataSource;
34
-    private boolean dataSourceInitialized = false;
33
+    private volatile boolean dataSourceInitialized = false;
35
 
34
 
36
     // === 新增:缓存超级表结构 (key = dbName.stableName) ===
35
     // === 新增:缓存超级表结构 (key = dbName.stableName) ===
37
     private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
36
     private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
423
         if (data == null || data.isEmpty()) return "";
422
         if (data == null || data.isEmpty()) return "";
424
         try {
423
         try {
425
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
424
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
426
-            GZIPOutputStream gzip = new GZIPOutputStream(bos);
427
-            gzip.write(data.getBytes("UTF-8"));
428
-            gzip.close();
425
+            try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
426
+                gzip.write(data.getBytes("UTF-8"));
427
+            }
429
             byte[] compressed = Base64.getEncoder().encode(bos.toByteArray());
428
             byte[] compressed = Base64.getEncoder().encode(bos.toByteArray());
430
             return new String(compressed);
429
             return new String(compressed);
431
         } catch (Exception e) {
430
         } catch (Exception e) {

+ 13
- 12
iot-platform/src/test/java/com/iot/platform/mqtt/MqttFaultConsumerTest.java Целия файл

2
 
2
 
3
 import com.fasterxml.jackson.databind.ObjectMapper;
3
 import com.fasterxml.jackson.databind.ObjectMapper;
4
 import com.iot.platform.common.utils.NumericIdGenerator;
4
 import com.iot.platform.common.utils.NumericIdGenerator;
5
+import com.iot.platform.config.IotProperties;
5
 import com.iot.platform.domain.SysDevice;
6
 import com.iot.platform.domain.SysDevice;
6
 import com.iot.platform.domain.SysFault;
7
 import com.iot.platform.domain.SysFault;
7
 import com.iot.platform.service.*;
8
 import com.iot.platform.service.*;
9
 import org.junit.jupiter.api.DisplayName;
10
 import org.junit.jupiter.api.DisplayName;
10
 import org.junit.jupiter.api.Test;
11
 import org.junit.jupiter.api.Test;
11
 import org.junit.jupiter.api.extension.ExtendWith;
12
 import org.junit.jupiter.api.extension.ExtendWith;
12
-import org.mockito.InjectMocks;
13
 import org.mockito.Mock;
13
 import org.mockito.Mock;
14
 import org.mockito.junit.jupiter.MockitoExtension;
14
 import org.mockito.junit.jupiter.MockitoExtension;
15
 import org.mockito.junit.jupiter.MockitoSettings;
15
 import org.mockito.junit.jupiter.MockitoSettings;
16
 import org.mockito.quality.Strictness;
16
 import org.mockito.quality.Strictness;
17
-import org.springframework.test.util.ReflectionTestUtils;
18
 import org.springframework.web.client.RestTemplate;
17
 import org.springframework.web.client.RestTemplate;
19
 
18
 
20
 import java.sql.SQLException;
19
 import java.sql.SQLException;
21
 import java.time.LocalDate;
20
 import java.time.LocalDate;
22
-import java.time.format.DateTimeFormatter;
23
 import java.util.Collections;
21
 import java.util.Collections;
24
 import java.util.HashMap;
22
 import java.util.HashMap;
25
 import java.util.Map;
23
 import java.util.Map;
57
     @Mock
55
     @Mock
58
     private RestTemplate restTemplate;
56
     private RestTemplate restTemplate;
59
 
57
 
58
+    @Mock
59
+    private IotProperties iotProperties;
60
+
60
     @Mock(name = "mqttFaultExecutor")
61
     @Mock(name = "mqttFaultExecutor")
61
     private ExecutorService mqttFaultExecutor;
62
     private ExecutorService mqttFaultExecutor;
62
 
63
 
63
     @Mock(name = "abstractConsumerExecutor")
64
     @Mock(name = "abstractConsumerExecutor")
64
     private ExecutorService abstractConsumerExecutor;
65
     private ExecutorService abstractConsumerExecutor;
65
 
66
 
66
-    @InjectMocks
67
     private MqttFaultConsumer mqttFaultConsumer;
67
     private MqttFaultConsumer mqttFaultConsumer;
68
 
68
 
69
     private final ObjectMapper objectMapper = new ObjectMapper();
69
     private final ObjectMapper objectMapper = new ObjectMapper();
70
 
70
 
71
     @BeforeEach
71
     @BeforeEach
72
     void setUp() {
72
     void setUp() {
73
-        ReflectionTestUtils.setField(mqttFaultConsumer, "sysControllerService", sysControllerService);
74
-        ReflectionTestUtils.setField(mqttFaultConsumer, "sysFaultService", sysFaultService);
75
-        ReflectionTestUtils.setField(mqttFaultConsumer, "sysrealtimeService", sysrealtimeService);
76
-        ReflectionTestUtils.setField(mqttFaultConsumer, "sysWorkorderService", sysWorkorderService);
77
-        ReflectionTestUtils.setField(mqttFaultConsumer, "sysAlarmService", sysAlarmService);
78
-        ReflectionTestUtils.setField(mqttFaultConsumer, "numericIdGenerator", numericIdGenerator);
79
-        ReflectionTestUtils.setField(mqttFaultConsumer, "tDegnineAlarm", tDegnineAlarm);
80
-        ReflectionTestUtils.setField(mqttFaultConsumer, "restTemplate", restTemplate);
73
+        mqttFaultConsumer = new MqttFaultConsumer(
74
+                mqttFaultExecutor, abstractConsumerExecutor, iotProperties,
75
+                sysControllerService, sysFaultService, sysrealtimeService,
76
+                sysWorkorderService, sysAlarmService, numericIdGenerator,
77
+                tDegnineAlarm, restTemplate);
78
+
79
+        IotProperties.Mqtt mqttConfig = new IotProperties.Mqtt();
80
+        mqttConfig.setAlarmWebhookUrl("https://esos-iot.com:9443/syscar/gaojing");
81
+        when(iotProperties.getMqtt()).thenReturn(mqttConfig);
81
 
82
 
82
         // executor mock runs submitted tasks synchronously
83
         // executor mock runs submitted tasks synchronously
83
         lenient().when(mqttFaultExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {
84
         lenient().when(mqttFaultExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {

+ 3
- 1
iot-platform/src/test/java/com/iot/platform/mqtt/MqttGenericConsumerTest.java Целия файл

1
 package com.iot.platform.mqtt;
1
 package com.iot.platform.mqtt;
2
 
2
 
3
+import com.iot.platform.config.IotProperties;
3
 import com.iot.platform.service.SysControllerService;
4
 import com.iot.platform.service.SysControllerService;
4
 import org.junit.jupiter.api.DisplayName;
5
 import org.junit.jupiter.api.DisplayName;
5
 import org.junit.jupiter.api.Test;
6
 import org.junit.jupiter.api.Test;
14
 class MqttGenericConsumerTest {
15
 class MqttGenericConsumerTest {
15
 
16
 
16
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
17
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
18
+    private final IotProperties iotProperties = mock(IotProperties.class);
17
     private final StringRedisTemplate stringRedisTemplate = mock(StringRedisTemplate.class);
19
     private final StringRedisTemplate stringRedisTemplate = mock(StringRedisTemplate.class);
18
     private final SysControllerService sysControllerService = mock(SysControllerService.class);
20
     private final SysControllerService sysControllerService = mock(SysControllerService.class);
19
     private final MqttDynamicConsumer mqttDynamicConsumer = mock(MqttDynamicConsumer.class);
21
     private final MqttDynamicConsumer mqttDynamicConsumer = mock(MqttDynamicConsumer.class);
20
 
22
 
21
     private MqttGenericConsumer createConsumer() {
23
     private MqttGenericConsumer createConsumer() {
22
-        return new MqttGenericConsumer(executorService, stringRedisTemplate, sysControllerService, mqttDynamicConsumer);
24
+        return new MqttGenericConsumer(executorService, iotProperties, stringRedisTemplate, sysControllerService, mqttDynamicConsumer);
23
     }
25
     }
24
 
26
 
25
     @Test
27
     @Test

+ 18
- 18
iot-platform/src/test/java/com/iot/platform/mqtt/MqttStatusConsumerTest.java Целия файл

1
 package com.iot.platform.mqtt;
1
 package com.iot.platform.mqtt;
2
 
2
 
3
+import com.iot.platform.config.IotProperties;
3
 import com.iot.platform.service.SysControllerService;
4
 import com.iot.platform.service.SysControllerService;
4
 import com.iot.platform.service.SysStatusService;
5
 import com.iot.platform.service.SysStatusService;
5
 import org.junit.jupiter.api.BeforeEach;
6
 import org.junit.jupiter.api.BeforeEach;
6
 import org.junit.jupiter.api.DisplayName;
7
 import org.junit.jupiter.api.DisplayName;
7
 import org.junit.jupiter.api.Test;
8
 import org.junit.jupiter.api.Test;
8
 import org.junit.jupiter.api.extension.ExtendWith;
9
 import org.junit.jupiter.api.extension.ExtendWith;
9
-import org.mockito.InjectMocks;
10
 import org.mockito.Mock;
10
 import org.mockito.Mock;
11
 import org.mockito.junit.jupiter.MockitoExtension;
11
 import org.mockito.junit.jupiter.MockitoExtension;
12
 import org.mockito.junit.jupiter.MockitoSettings;
12
 import org.mockito.junit.jupiter.MockitoSettings;
13
 import org.mockito.quality.Strictness;
13
 import org.mockito.quality.Strictness;
14
 import org.springframework.data.redis.core.HashOperations;
14
 import org.springframework.data.redis.core.HashOperations;
15
 import org.springframework.data.redis.core.StringRedisTemplate;
15
 import org.springframework.data.redis.core.StringRedisTemplate;
16
-import org.springframework.test.util.ReflectionTestUtils;
17
 
16
 
18
 import java.util.HashMap;
17
 import java.util.HashMap;
19
 import java.util.Map;
18
 import java.util.Map;
46
     @Mock
45
     @Mock
47
     private HashOperations<String, Object, Object> hashOperations;
46
     private HashOperations<String, Object, Object> hashOperations;
48
 
47
 
49
-    @InjectMocks
48
+    @Mock
49
+    private IotProperties iotProperties;
50
+
50
     private MqttStatusConsumer mqttStatusConsumer;
51
     private MqttStatusConsumer mqttStatusConsumer;
51
 
52
 
52
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
53
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
53
 
54
 
54
     @BeforeEach
55
     @BeforeEach
55
     void setUp() {
56
     void setUp() {
56
-        ReflectionTestUtils.setField(mqttStatusConsumer, "stringRedisTemplate", stringRedisTemplate);
57
-        ReflectionTestUtils.setField(mqttStatusConsumer, "sysControllerService", sysControllerService);
58
-        ReflectionTestUtils.setField(mqttStatusConsumer, "sysStatusService", sysStatusService);
57
+        mqttStatusConsumer = new MqttStatusConsumer(executorService, iotProperties,
58
+                stringRedisTemplate, sysControllerService, sysStatusService);
59
         lenient().when(stringRedisTemplate.opsForHash()).thenReturn(hashOperations);
59
         lenient().when(stringRedisTemplate.opsForHash()).thenReturn(hashOperations);
60
     }
60
     }
61
 
61
 
152
     }
152
     }
153
 
153
 
154
     @Test
154
     @Test
155
-    @DisplayName("triggermethod with null controller_id throws NullPointerException")
156
-    void triggermethod_nullControllerId_throwsException() {
155
+    @DisplayName("triggermethod with null controller_id returns silently")
156
+    void triggermethod_nullControllerId_returnsSilently() throws Exception {
157
         Map<String, Object> weather = new HashMap<>();
157
         Map<String, Object> weather = new HashMap<>();
158
         weather.put("controller_id", null);
158
         weather.put("controller_id", null);
159
         weather.put("fleet_id", "FLEET_001");
159
         weather.put("fleet_id", "FLEET_001");
160
         weather.put("status", "online");
160
         weather.put("status", "online");
161
 
161
 
162
-        assertThatThrownBy(() -> mqttStatusConsumer.triggermethod(weather))
163
-            .isInstanceOf(NullPointerException.class);
162
+        mqttStatusConsumer.triggermethod(weather);
163
+        verify(sysStatusService, never()).selectstatuscount(anyString());
164
     }
164
     }
165
 
165
 
166
     @Test
166
     @Test
167
-    @DisplayName("triggermethod with null fleet_id throws NullPointerException")
168
-    void triggermethod_nullFleetId_throwsException() {
167
+    @DisplayName("triggermethod with null fleet_id returns silently")
168
+    void triggermethod_nullFleetId_returnsSilently() throws Exception {
169
         Map<String, Object> weather = new HashMap<>();
169
         Map<String, Object> weather = new HashMap<>();
170
         weather.put("controller_id", "CTRL_001");
170
         weather.put("controller_id", "CTRL_001");
171
         weather.put("fleet_id", null);
171
         weather.put("fleet_id", null);
172
         weather.put("status", "online");
172
         weather.put("status", "online");
173
 
173
 
174
-        assertThatThrownBy(() -> mqttStatusConsumer.triggermethod(weather))
175
-            .isInstanceOf(NullPointerException.class);
174
+        mqttStatusConsumer.triggermethod(weather);
175
+        verify(sysStatusService, never()).selectstatuscount(anyString());
176
     }
176
     }
177
 
177
 
178
     @Test
178
     @Test
179
-    @DisplayName("triggermethod with null status throws NullPointerException")
180
-    void triggermethod_nullStatus_throwsException() {
179
+    @DisplayName("triggermethod with null status returns silently")
180
+    void triggermethod_nullStatus_returnsSilently() throws Exception {
181
         Map<String, Object> weather = new HashMap<>();
181
         Map<String, Object> weather = new HashMap<>();
182
         weather.put("controller_id", "CTRL_001");
182
         weather.put("controller_id", "CTRL_001");
183
         weather.put("fleet_id", "FLEET_001");
183
         weather.put("fleet_id", "FLEET_001");
184
         weather.put("status", null);
184
         weather.put("status", null);
185
 
185
 
186
-        assertThatThrownBy(() -> mqttStatusConsumer.triggermethod(weather))
187
-            .isInstanceOf(NullPointerException.class);
186
+        mqttStatusConsumer.triggermethod(weather);
187
+        verify(sysStatusService, never()).selectstatuscount(anyString());
188
     }
188
     }
189
 
189
 
190
     @Test
190
     @Test

Loading…
Отказ
Запис