Pārlūkot izejas kodu

refactor(service): 全面优化 TdEngineService 资源管理与代码质量

- 资源管理:loadStableColumnsFromDB / ensureColumnsExist / ensureTableExists
  统一改为 try-with-resources,消除手动 close 的冗长代码
- 缓存策略:getStableColumns 使用 computeIfAbsent 解决多线程竞态条件;
  缓存满时改为淘汰最老的 20% 条目,替代粗暴 clear()
- 列数计算:insertBatchInternal 中扣除 existingColumns 与 columnTypes 的交集,
  避免重复计算导致不必要的插入拒绝
- 缓存更新:ensureColumnsExist 循环结束后统一更新 stableColumnCache
- 连接池配置:IotProperties.TDengine 新增 HikariCP 配置项,initDataSource 改为读取配置
- 日志级别:loadStableColumnsFromDB 表不存在场景 error→debug;
  formatValue 超长截断 info→debug
- ZoneOffset.of("+8") 提取为 static final ZONE_OFFSET_8,避免重复创建
mqy20260511
humanleft 2 nedēļas atpakaļ
vecāks
revīzija
1de1fcc14c

+ 74
- 0
iot-platform/src/main/java/com/iot/platform/config/IotProperties.java Parādīt failu

@@ -119,6 +119,16 @@ public class IotProperties {
119 119
         private String username = "";
120 120
         private String password = "";
121 121
 
122
+        // HikariCP 连接池配置(默认值与之前硬编码保持一致)
123
+        private int maximumPoolSize = 20;
124
+        private int minimumIdle = 5;
125
+        private long connectionTimeout = 10000;
126
+        private long idleTimeout = 30000;
127
+        private long maxLifetime = 600000;
128
+        private long leakDetectionThreshold = 0;
129
+        private long validationTimeout = 3000;
130
+        private String connectionTestQuery = "SELECT NOW()";
131
+
122 132
         public String getUrl() {
123 133
             return url;
124 134
         }
@@ -142,5 +152,69 @@ public class IotProperties {
142 152
         public void setPassword(String password) {
143 153
             this.password = password;
144 154
         }
155
+
156
+        public int getMaximumPoolSize() {
157
+            return maximumPoolSize;
158
+        }
159
+
160
+        public void setMaximumPoolSize(int maximumPoolSize) {
161
+            this.maximumPoolSize = maximumPoolSize;
162
+        }
163
+
164
+        public int getMinimumIdle() {
165
+            return minimumIdle;
166
+        }
167
+
168
+        public void setMinimumIdle(int minimumIdle) {
169
+            this.minimumIdle = minimumIdle;
170
+        }
171
+
172
+        public long getConnectionTimeout() {
173
+            return connectionTimeout;
174
+        }
175
+
176
+        public void setConnectionTimeout(long connectionTimeout) {
177
+            this.connectionTimeout = connectionTimeout;
178
+        }
179
+
180
+        public long getIdleTimeout() {
181
+            return idleTimeout;
182
+        }
183
+
184
+        public void setIdleTimeout(long idleTimeout) {
185
+            this.idleTimeout = idleTimeout;
186
+        }
187
+
188
+        public long getMaxLifetime() {
189
+            return maxLifetime;
190
+        }
191
+
192
+        public void setMaxLifetime(long maxLifetime) {
193
+            this.maxLifetime = maxLifetime;
194
+        }
195
+
196
+        public long getLeakDetectionThreshold() {
197
+            return leakDetectionThreshold;
198
+        }
199
+
200
+        public void setLeakDetectionThreshold(long leakDetectionThreshold) {
201
+            this.leakDetectionThreshold = leakDetectionThreshold;
202
+        }
203
+
204
+        public long getValidationTimeout() {
205
+            return validationTimeout;
206
+        }
207
+
208
+        public void setValidationTimeout(long validationTimeout) {
209
+            this.validationTimeout = validationTimeout;
210
+        }
211
+
212
+        public String getConnectionTestQuery() {
213
+            return connectionTestQuery;
214
+        }
215
+
216
+        public void setConnectionTestQuery(String connectionTestQuery) {
217
+            this.connectionTestQuery = connectionTestQuery;
218
+        }
145 219
     }
146 220
 }

+ 5
- 2
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Parādīt failu

@@ -41,11 +41,14 @@ public class MqttChargeStationConsumer extends AbstractDynamicMqttConsumer {
41 41
 
42 42
     @Override
43 43
     protected void processMessage(String content, String topic) throws Exception {
44
-        List<Map<String, Object>> messageList = JSON.parseObject(content, new TypeReference<List<Map<String, Object>>>() {});
44
+        List<Map<String, Object>> messageList = JSON.parseObject(
45
+                content, new TypeReference<List<Map<String, Object>>>() {});
45 46
         processMessageAndWriteToTdEngine(messageList, topic);
46 47
     }
47 48
 
48
-    private void processMessageAndWriteToTdEngine(List<Map<String, Object>> dataList, String topic) throws Exception {
49
+    private void processMessageAndWriteToTdEngine(List<Map<String, Object>> dataList, String topic)
50
+            throws Exception {
51
+
49 52
         if (dataList == null || dataList.isEmpty()) {
50 53
             return;
51 54
         }

+ 88
- 118
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Parādīt failu

@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.stereotype.Service;
10 10
 
11 11
 import java.sql.*;
12
+import java.time.ZoneOffset;
12 13
 import java.util.*;
13 14
 import java.util.concurrent.*;
14 15
 
@@ -49,24 +50,28 @@ public class TdEngineService {
49 50
     // 默认 VARCHAR 字段长度限制
50 51
     private static final int DEFAULT_VARCHAR_LENGTH = 38;
51 52
 
53
+    // 东八区时区偏移(避免重复创建)
54
+    private static final ZoneOffset ZONE_OFFSET_8 = ZoneOffset.of("+8");
55
+
52 56
     private synchronized void initDataSource() {
53 57
         if (dataSourceInitialized) {
54 58
             return;
55 59
         }
56 60
         try {
61
+            IotProperties.TDengine tdCfg = iotProperties.getTdengine();
57 62
             HikariConfig config = new HikariConfig();
58
-            config.setJdbcUrl(iotProperties.getTdengine().getUrl());
59
-            config.setUsername(iotProperties.getTdengine().getUsername());
60
-            config.setPassword(iotProperties.getTdengine().getPassword());
63
+            config.setJdbcUrl(tdCfg.getUrl());
64
+            config.setUsername(tdCfg.getUsername());
65
+            config.setPassword(tdCfg.getPassword());
61 66
             config.setDriverClassName("com.taosdata.jdbc.TSDBDriver");
62
-            config.setMaximumPoolSize(20);
63
-            config.setMinimumIdle(5);
64
-            config.setConnectionTimeout(10000);
65
-            config.setIdleTimeout(30000);
66
-            config.setMaxLifetime(600000);
67
-            config.setLeakDetectionThreshold(0);
68
-            config.setConnectionTestQuery("SELECT NOW()");
69
-            config.setValidationTimeout(3000);
67
+            config.setMaximumPoolSize(tdCfg.getMaximumPoolSize());
68
+            config.setMinimumIdle(tdCfg.getMinimumIdle());
69
+            config.setConnectionTimeout(tdCfg.getConnectionTimeout());
70
+            config.setIdleTimeout(tdCfg.getIdleTimeout());
71
+            config.setMaxLifetime(tdCfg.getMaxLifetime());
72
+            config.setLeakDetectionThreshold(tdCfg.getLeakDetectionThreshold());
73
+            config.setConnectionTestQuery(tdCfg.getConnectionTestQuery());
74
+            config.setValidationTimeout(tdCfg.getValidationTimeout());
70 75
             this.dataSource = new HikariDataSource(config);
71 76
             log.info("TdEngine 连接池初始化完成");
72 77
         } catch (Exception e) {
@@ -146,50 +151,37 @@ public class TdEngineService {
146 151
             return cached;
147 152
         }
148 153
 
149
-        Set<String> columns = loadStableColumnsFromDB(dbName, stableName);
150
-        if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
151
-            stableColumnCache.clear();
152
-            log.warn("TdEngine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
153
-        }
154
-        stableColumnCache.put(key, columns);
155
-        return columns;
154
+        // 使用 computeIfAbsent 保证原子性,避免多线程重复加载
155
+        return stableColumnCache.computeIfAbsent(key, k -> {
156
+            Set<String> columns = loadStableColumnsFromDB(dbName, stableName);
157
+            // 缓存满时淘汰最老的 20% 条目
158
+            if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
159
+                int toRemove = Math.max(1, MAX_CACHE_SIZE / 5);
160
+                Iterator<String> it = stableColumnCache.keySet().iterator();
161
+                for (int i = 0; i < toRemove && it.hasNext(); i++) {
162
+                    it.next();
163
+                    it.remove();
164
+                }
165
+                log.warn("TdEngine 超级表缓存已达上限({}),已淘汰 {} 个条目", MAX_CACHE_SIZE, toRemove);
166
+            }
167
+            return columns;
168
+        });
156 169
     }
157 170
 
158 171
     private Set<String> loadStableColumnsFromDB(String dbName, String stableName) {
159 172
         Set<String> columns = new HashSet<>();
160 173
         String sql = String.format("DESCRIBE %s.%s", wrapName(dbName), wrapName(stableName));
161
-        Connection conn = null;
162
-        Statement stmt = null;
163
-        ResultSet rs = null;
164
-
165
-        try {
166
-            conn = getConnection();
167
-            stmt = conn.createStatement();
174
+        try (Connection conn = getConnection();
175
+             Statement stmt = conn.createStatement();
176
+             ResultSet rs = stmt.executeQuery(sql)) {
168 177
             stmt.setQueryTimeout(5);
169
-            rs = stmt.executeQuery(sql);
170 178
             while (rs.next()) {
171 179
                 columns.add(rs.getString(1));
172 180
             }
173 181
         } catch (SQLException e) {
174 182
             if (!e.getMessage().contains("Table does not exist") && !e.getMessage().contains("invalid name")) {
175
-                log.error("查询超级表列失败: {}.{} | {}", dbName, stableName, e.getMessage());
176
-            }
177
-        } finally {
178
-            if (rs != null) {
179
-                try {
180
-                    rs.close();
181
-                } catch (SQLException e) {
182
-                    log.debug("关闭 ResultSet 时发生异常: {}", e.getMessage());
183
-                }
183
+                log.debug("查询超级表列失败: {}.{} | {}", dbName, stableName, e.getMessage());
184 184
             }
185
-            if (stmt != null) {
186
-                try {
187
-                    stmt.close();
188
-                } catch (SQLException e) {
189
-                    log.debug("关闭 Statement 时发生异常: {}", e.getMessage());
190
-                }
191
-            }
192
-            closeConnection(conn);
193 185
         }
194 186
         return columns;
195 187
     }
@@ -341,7 +333,14 @@ public class TdEngineService {
341 333
         }
342 334
 
343 335
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
344
-        int totalColumns = existingColumns.size() + columnTypes.size();
336
+        // 计算实际新增列数(扣除已有列的交集)
337
+        int newColumns = 0;
338
+        for (String col : columnTypes.keySet()) {
339
+            if (!existingColumns.contains(col)) {
340
+                newColumns++;
341
+            }
342
+        }
343
+        int totalColumns = existingColumns.size() + newColumns;
345 344
         if (totalColumns > MAX_COLUMNS_PER_STABLE) {
346 345
             log.error("超级表总列数({})超过限制({}),跳过本次插入 | 表: {}.{}",
347 346
                     totalColumns, MAX_COLUMNS_PER_STABLE, dbName, table);
@@ -402,8 +401,8 @@ public class TdEngineService {
402 401
             return "DOUBLE";
403 402
         }
404 403
         // 时间类型(Date, Timestamp, LocalDateTime 等)
405
-        if (value instanceof java.util.Date || value instanceof java.sql.Timestamp
406
-                || value instanceof java.time.LocalDateTime || value instanceof java.time.Instant) {
404
+        if (value instanceof java.util.Date || value instanceof java.time.LocalDateTime
405
+                || value instanceof java.time.Instant) {
407 406
             return "TIMESTAMP";
408 407
         }
409 408
         return "VARCHAR";
@@ -460,11 +459,9 @@ public class TdEngineService {
460 459
             if (value instanceof java.util.Date) {
461 460
                 return String.valueOf(((java.util.Date) value).getTime());
462 461
             }
463
-            if (value instanceof java.sql.Timestamp) {
464
-                return String.valueOf(((java.sql.Timestamp) value).getTime());
465
-            }
466 462
             if (value instanceof java.time.LocalDateTime) {
467
-                return String.valueOf(((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.of("+8")).toEpochMilli());
463
+                return String.valueOf(((java.time.LocalDateTime) value)
464
+                        .toInstant(ZONE_OFFSET_8).toEpochMilli());
468 465
             }
469 466
             if (value instanceof java.time.Instant) {
470 467
                 return String.valueOf(((java.time.Instant) value).toEpochMilli());
@@ -482,7 +479,7 @@ public class TdEngineService {
482 479
             }
483 480
         }
484 481
         if (strValue.length() > maxLen) {
485
-            log.info("字段值超长,截断存储 | 列: {} | 值长度: {} | 最大: {} | 截断后: {}...",
482
+            log.debug("字段值超长,截断存储 | 列: {} | 值长度: {} | 最大: {} | 截断后: {}...",
486 483
                     columnName, strValue.length(), maxLen, strValue.substring(0, maxLen));
487 484
             strValue = strValue.substring(0, maxLen);
488 485
         }
@@ -503,11 +500,7 @@ public class TdEngineService {
503 500
             return;
504 501
         }
505 502
 
506
-        Connection conn = null;
507
-        Statement stmt = null;
508
-        try {
509
-            conn = getConnection();
510
-            stmt = conn.createStatement();
503
+        try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
511 504
             stmt.setQueryTimeout(10);
512 505
 
513 506
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
@@ -544,21 +537,12 @@ public class TdEngineService {
544 537
                             throw e;
545 538
                         }
546 539
                     }
547
-
548
-                    // 更新缓存
549
-                    String key = getStableKey(dbName, superTableName);
550
-                    stableColumnCache.put(key, existingColumns);
551
-                }
552
-            }
553
-        } finally {
554
-            if (stmt != null) {
555
-                try {
556
-                    stmt.close();
557
-                } catch (SQLException e) {
558
-                    log.debug("关闭 Statement 时发生异常: {}", e.getMessage());
559 540
                 }
560 541
             }
561
-            closeConnection(conn);
542
+
543
+            // 循环结束后统一更新缓存
544
+            String key = getStableKey(dbName, superTableName);
545
+            stableColumnCache.put(key, existingColumns);
562 546
         }
563 547
     }
564 548
 
@@ -571,63 +555,49 @@ public class TdEngineService {
571 555
                     + ", superTableName=" + superTableName + ", table=" + table);
572 556
         }
573 557
 
574
-        Connection conn = null;
575
-        PreparedStatement pstmt = null;
576
-        Statement stmt = null;
577
-        ResultSet rs = null;
578
-        try {
579
-            conn = getConnection();
580
-
558
+        try (Connection conn = getConnection()) {
581 559
             // 检查超级表是否存在 — 使用 PreparedStatement 防止 SQL 注入
582 560
             String checkStableSql = "SELECT * FROM information_schema.ins_tables WHERE table_name = ? AND db_name = ?";
583
-            pstmt = conn.prepareStatement(checkStableSql);
584
-            pstmt.setString(1, superTableName);
585
-            pstmt.setString(2, dbName);
586
-            pstmt.setQueryTimeout(5);
587
-            rs = pstmt.executeQuery();
588
-            boolean stableExists = rs.next();
589
-
590
-            if (!stableExists) {
591
-                closeQuietly(rs, pstmt);
592
-                rs = null;
593
-                pstmt = null;
594
-                log.info("超级表不存在,创建: {}.{}", dbName, superTableName);
595
-                initTableStructure(dbName, superTableName, table);
596
-                return;
561
+            try (PreparedStatement pstmt = conn.prepareStatement(checkStableSql)) {
562
+                pstmt.setString(1, superTableName);
563
+                pstmt.setString(2, dbName);
564
+                pstmt.setQueryTimeout(5);
565
+                try (ResultSet rs = pstmt.executeQuery()) {
566
+                    if (!rs.next()) {
567
+                        log.info("超级表不存在,创建: {}.{}", dbName, superTableName);
568
+                        initTableStructure(dbName, superTableName, table);
569
+                        return;
570
+                    }
571
+                }
597 572
             }
598 573
 
599
-            closeQuietly(rs, pstmt);
600
-            rs = null;
601
-            pstmt = null;
602
-
603 574
             // 检查子表是否存在 — 使用 PreparedStatement 防止 SQL 注入
604
-            String checkTableSql = "SELECT * FROM information_schema.ins_tables WHERE table_name = ? AND db_name = ? AND table_type = 'CHILD_TABLE'";
605
-            pstmt = conn.prepareStatement(checkTableSql);
606
-            pstmt.setString(1, table);
607
-            pstmt.setString(2, dbName);
608
-            pstmt.setQueryTimeout(5);
609
-            rs = pstmt.executeQuery();
610
-            boolean tableExists = rs.next();
611
-
612
-            if (!tableExists) {
613
-                String tableSql = String.format(
614
-                        "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
615
-                        wrapName(dbName),
616
-                        wrapName(table),
617
-                        wrapName(dbName),
618
-                        wrapName(superTableName),
619
-                        escapeValue(superTableName)
620
-                );
621
-                stmt = conn.createStatement();
622
-                stmt.setQueryTimeout(5);
623
-                stmt.executeUpdate(tableSql);
624
-                log.info("子表创建成功: {}", table);
575
+            String checkTableSql = "SELECT * FROM information_schema.ins_tables " +
576
+                    "WHERE table_name = ? AND db_name = ? AND table_type = 'CHILD_TABLE'";
577
+            try (PreparedStatement pstmt = conn.prepareStatement(checkTableSql)) {
578
+                pstmt.setString(1, table);
579
+                pstmt.setString(2, dbName);
580
+                pstmt.setQueryTimeout(5);
581
+                try (ResultSet rs = pstmt.executeQuery()) {
582
+                    if (!rs.next()) {
583
+                        String tableSql = String.format(
584
+                                "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
585
+                                wrapName(dbName),
586
+                                wrapName(table),
587
+                                wrapName(dbName),
588
+                                wrapName(superTableName),
589
+                                escapeValue(superTableName)
590
+                        );
591
+                        try (Statement stmt = conn.createStatement()) {
592
+                            stmt.setQueryTimeout(5);
593
+                            stmt.executeUpdate(tableSql);
594
+                            log.info("子表创建成功: {}", table);
595
+                        }
596
+                    }
597
+                }
625 598
             }
626 599
         } catch (SQLException e) {
627 600
             throw new SQLException("检查表存在性失败: " + dbName + "." + table + " | " + e.getMessage(), e);
628
-        } finally {
629
-            closeQuietly(rs, pstmt, stmt);
630
-            closeConnection(conn);
631 601
         }
632 602
     }
633 603
 

Notiek ielāde…
Atcelt
Saglabāt