Selaa lähdekoodia

Merge branch 'master' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511

 Conflicts:
	iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java
mqy20260511
lenovo 2 viikkoa sitten
vanhempi
commit
fa665b2448

+ 55
- 0
iot-platform/src/main/java/com/iot/platform/controller/IotController.java Näytä tiedosto

@@ -0,0 +1,55 @@
1
+package com.iot.platform.controller;
2
+
3
+import com.iot.platform.common.AjaxResult;
4
+import com.iot.platform.service.TdEngineService;
5
+import org.springframework.beans.factory.annotation.Autowired;
6
+import org.springframework.data.redis.core.StringRedisTemplate;
7
+import org.springframework.web.bind.annotation.*;
8
+
9
+import java.util.Map;
10
+
11
+@RestController
12
+@RequestMapping("/iot")
13
+public class IotController {
14
+
15
+    @Autowired
16
+    private StringRedisTemplate stringRedisTemplate;
17
+
18
+    @Autowired
19
+    private TdEngineService tdEngineService;
20
+
21
+    /**
22
+     * 根据 controllerId、deviceId 和 key 查询 Redis 中存储的数据
23
+     */
24
+    @GetMapping("/redis/query")
25
+    public AjaxResult queryRedisData(@RequestParam String controllerId,
26
+                                     @RequestParam String deviceId,
27
+                                     @RequestParam(required = false) String key) {
28
+        String redisKey = "pe_iot_" + controllerId + "_" + deviceId;
29
+
30
+        if (key == null || key.trim().isEmpty()) {
31
+            // 查询整个 hash
32
+            Map<Object, Object> data = stringRedisTemplate.opsForHash().entries(redisKey);
33
+            return AjaxResult.success(data);
34
+        } else {
35
+            // 查询指定 field
36
+            Object value = stringRedisTemplate.opsForHash().get(redisKey, key);
37
+            return AjaxResult.success(value);
38
+        }
39
+    }
40
+
41
+    /**
42
+     * 根据 controllerId、deviceId 和 key 查询 TDengine 中存储的数据
43
+     */
44
+    @GetMapping("/tdengine/query")
45
+    public AjaxResult queryTdEngineData(@RequestParam String controllerId,
46
+                                        @RequestParam String deviceId,
47
+                                        @RequestParam(required = false) String key) {
48
+        try {
49
+            Object result = tdEngineService.queryLatestData(controllerId, deviceId, key);
50
+            return AjaxResult.success(result);
51
+        } catch (Exception e) {
52
+            return AjaxResult.error("查询TDengine失败: " + e.getMessage());
53
+        }
54
+    }
55
+}

+ 4
- 4
iot-platform/src/main/java/com/iot/platform/mapper/SysRealtimeMapper.java Näytä tiedosto

@@ -9,8 +9,8 @@ import java.util.List;
9 9
 public interface SysRealtimeMapper {
10 10
     void createRealtime(@Param("tableName")String tableName);
11 11
     List<String> selectTables();
12
-    void insertTables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("deviceId")String deviceId,@Param("timestamp")String timestamp,@Param("k")String k,@Param("v")Object v);
13
-    void updateTables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("v")Object v,@Param("timestamp")String timestamp,@Param("k")String k,@Param("deviceId")String deviceId);
14
-    Integer selectKey(@Param("tableName")String tableName,@Param("k")String k);
15
-    List<String> selectAllKeys(@Param("tableName")String tableName);
12
+    void insertTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId, @Param("timestamp")String timestamp, @Param("k")String k, @Param("v")Object v);
13
+    void updateTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("v")Object v, @Param("timestamp")String timestamp, @Param("k")String k, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId);
14
+    Integer selectKey(@Param("tableName")String tableName, @Param("k")String k, @Param("controllerId")String controllerId);
15
+    List<String> selectAllKeys(@Param("tableName")String tableName, @Param("controllerId")String controllerId);
16 16
 }

+ 10
- 16
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Näytä tiedosto

@@ -12,7 +12,6 @@ import org.springframework.context.annotation.DependsOn;
12 12
 import org.springframework.data.redis.core.StringRedisTemplate;
13 13
 import org.springframework.stereotype.Component;
14 14
 
15
-import java.time.LocalDate;
16 15
 import java.time.LocalDateTime;
17 16
 import java.time.format.DateTimeFormatter;
18 17
 import java.util.*;
@@ -106,18 +105,16 @@ public class MqttDynamicConsumer extends AbstractDynamicMqttConsumer {
106 105
         data.put("device_id", ctx.deviceId);
107 106
         List<Map<String, Object>> batch = Collections.singletonList(data);
108 107
 
109
-        String dbNamePrefix = "pe_iot_"+ctx.dbName.substring(0, 2);
110
-        String controllerId = ctx.dbName;
111
-        tdengineService.insertBatch(dbNamePrefix, ctx.superTable, controllerId, batch);
108
+        String dbNamePrefix = "pe_iot_"+ctx.controllerId.substring(0, 2);
109
+        String controllerId = ctx.controllerId;
110
+        tdengineService.insertBatch(dbNamePrefix, ctx.deviceId, controllerId, batch);
112 111
     }
113 112
 
114 113
     private void writeToRedis(MessageContext ctx) {
115
-        String redisKey = "DSB:" + ctx.dbName + ":" + ctx.controllerName;
114
+        String redisKey = "pe_iot_" + ctx.controllerId + "_" + ctx.deviceId;
116 115
 
117 116
         Map<String, String> hashData = new HashMap<>();
118
-        hashData.put("createTime", ctx.currentTime);
119 117
         hashData.put("timestamp", ctx.timestamp);
120
-        hashData.put("device_id", ctx.deviceId);
121 118
 
122 119
         for (Map.Entry<String, Object> entry : ctx.metricData.entrySet()) {
123 120
             if (entry.getValue() != null) {
@@ -126,25 +123,22 @@ public class MqttDynamicConsumer extends AbstractDynamicMqttConsumer {
126 123
         }
127 124
 
128 125
         stringRedisTemplate.opsForHash().putAll(redisKey, hashData);
129
-        stringRedisTemplate.expire(redisKey, 2, TimeUnit.HOURS);
130
-        stringRedisTemplate.opsForSet().add("DSB:active:devices", redisKey);
131
-        stringRedisTemplate.expire("DSB:active:devices", 2, TimeUnit.HOURS);
126
+        stringRedisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
127
+        stringRedisTemplate.opsForSet().add("iot:dsb:active:devices", redisKey);
132 128
     }
133 129
 
134 130
     private static class MessageContext {
135
-        final String dbName;
136
-        final String superTable;
131
+        final String controllerId;
132
+        final String deviceId;
137 133
         final String controllerName;
138 134
         final JSONObject metricData;
139 135
         final String currentTime;
140 136
         final String timestamp;
141
-        final String deviceId;
142 137
 
143
-        MessageContext(String dbName, String superTable, String controllerName,
138
+        MessageContext(String controllerId, String devcieId, String controllerName,
144 139
                        JSONObject metricData, String currentTime,
145 140
                        String timestamp, String deviceId) {
146
-            this.dbName = dbName;
147
-            this.superTable = superTable;
141
+            this.controllerId = controllerId;
148 142
             this.controllerName = controllerName;
149 143
             this.metricData = metricData;
150 144
             this.currentTime = currentTime;

+ 8
- 8
iot-platform/src/main/java/com/iot/platform/service/SysRealtimeService.java Näytä tiedosto

@@ -30,24 +30,24 @@ public class SysRealtimeService {
30 30
         return sysRealtimeMapper.selectTables();
31 31
     }
32 32
 
33
-    public void insertTables(String tableName, String createTime, String deviceId, String timestamp, String k, Object v) {
33
+    public void insertTables(String tableName, String createTime, String controllerId, String deviceId, String timestamp, String k, Object v) {
34 34
         validateTableName(tableName);
35
-        sysRealtimeMapper.insertTables(tableName, createTime, deviceId, timestamp, k, v);
35
+        sysRealtimeMapper.insertTables(tableName, createTime, controllerId, deviceId, timestamp, k, v);
36 36
     }
37 37
 
38
-    public void updateTables(String tableName, String createTime, Object v, String timestamp, String k, String deviceId) {
38
+    public void updateTables(String tableName, String createTime, Object v, String timestamp, String k, String controllerId, String deviceId) {
39 39
         validateTableName(tableName);
40
-        sysRealtimeMapper.updateTables(tableName, createTime, v, timestamp, k, deviceId);
40
+        sysRealtimeMapper.updateTables(tableName, createTime, v, timestamp, k, controllerId, deviceId);
41 41
     }
42 42
 
43
-    public Integer selectKey(String tableName, String k) {
43
+    public Integer selectKey(String tableName, String k, String controllerId) {
44 44
         validateTableName(tableName);
45
-        return sysRealtimeMapper.selectKey(tableName, k);
45
+        return sysRealtimeMapper.selectKey(tableName, k, controllerId);
46 46
     }
47 47
 
48
-    public List<String> selectAllKeys(String tableName) {
48
+    public List<String> selectAllKeys(String tableName, String controllerId) {
49 49
         validateTableName(tableName);
50
-        return sysRealtimeMapper.selectAllKeys(tableName);
50
+        return sysRealtimeMapper.selectAllKeys(tableName, controllerId);
51 51
     }
52 52
 
53 53
     private void validateTableName(String tableName) {

+ 143
- 29
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Näytä tiedosto

@@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
8 8
 import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.stereotype.Service;
10 10
 
11
+import java.nio.charset.StandardCharsets;
11 12
 import java.math.BigDecimal;
12 13
 import java.sql.*;
13 14
 import java.time.ZoneOffset;
@@ -48,11 +49,11 @@ public class TdEngineService {
48 49
     // 每批次最大列数(防止 TDengine 行超限)
49 50
     private static final int MAX_COLUMNS_PER_INSERT = 100;
50 51
 
51
-    // 拆分超级表的列数阈值
52
-    private static final int SPLIT_STABLE_COLUMN_THRESHOLD = 100;
52
+    // 默认 VARCHAR 字段长度限制(用于未提供实际长度时的兜底)
53
+    private static final int DEFAULT_VARCHAR_LENGTH = 16;
53 54
 
54
-    // 默认 VARCHAR 字段长度限制
55
-    private static final int DEFAULT_VARCHAR_LENGTH = 128;
55
+    // TDengine 数据保留天数(超过此天数的数据自动删除)
56
+    private static final int DATA_RETENTION_DAYS = 180;
56 57
 
57 58
     // 东八区时区偏移(避免重复创建)
58 59
     private static final ZoneOffset ZONE_OFFSET_8 = ZoneOffset.of("+8");
@@ -237,7 +238,7 @@ public class TdEngineService {
237 238
         try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
238 239
             stmt.setQueryTimeout(10);
239 240
 
240
-            stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName));
241
+            stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName) + " KEEP " + DATA_RETENTION_DAYS);
241 242
 
242 243
             // 创建超级表:固定 ts + controller_id 列(TDengine 要求至少 2 列)
243 244
             String stableSql = String.format(
@@ -292,17 +293,18 @@ public class TdEngineService {
292 293
     private void splitAndInsertToMultipleStables(String dbName, String stableName, String controllerId,
293 294
                                                  List<Map<String, Object>> dataList,
294 295
                                                  Map<String, String> columnTypes) throws SQLException {
296
+
295 297
         // 按首字符 UTF-8 值分组
296
-        Map<Integer, Map<String, String>> groupColumnTypes = new LinkedHashMap<>();
298
+        Map<Integer, Map<String, String>> groupColumnTypeMap = new LinkedHashMap<>();
297 299
         for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
298 300
             int groupId = getFirstCharGroupId(entry.getKey());
299
-            groupColumnTypes.computeIfAbsent(groupId, k -> new LinkedHashMap<>()).put(entry.getKey(), entry.getValue());
301
+            groupColumnTypeMap.computeIfAbsent(groupId, k -> new LinkedHashMap<>()).put(entry.getKey(), entry.getValue());
300 302
         }
301 303
 
302
-        log.info("拆分后超级表数量: {} | 分组: {}", groupColumnTypes.size(), groupColumnTypes.keySet());
304
+        log.info("拆分后超级表数量: {} | 分组: {}", groupColumnTypeMap.size(), groupColumnTypeMap.keySet());
303 305
 
304 306
         // 每个分组作为一个超级表,处理其对应的列和数据
305
-        for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypes.entrySet()) {
307
+        for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypeMap.entrySet()) {
306 308
             int groupId = group.getKey();
307 309
             // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
308 310
             String newStableName = stableName+"_"+groupId;
@@ -330,7 +332,7 @@ public class TdEngineService {
330 332
             return 0;
331 333
         }
332 334
         String firstChar = columnName.substring(0, 1);
333
-        byte[] bytes = firstChar.getBytes(java.nio.charset.StandardCharsets.UTF_8);
335
+        byte[] bytes = firstChar.getBytes(StandardCharsets.UTF_8);
334 336
         return (bytes[0] & 0xFF) % 10;
335 337
     }
336 338
 
@@ -340,7 +342,9 @@ public class TdEngineService {
340 342
     private List<Map<String, Object>> filterDataByColumnGroup(List<Map<String, Object>> dataList, Set<String> columns) {
341 343
         List<Map<String, Object>> filtered = new ArrayList<>();
342 344
         for (Map<String, Object> data : dataList) {
343
-            if (data == null) continue;
345
+            if (data == null) {
346
+                continue;
347
+            }
344 348
             Map<String, Object> filteredRow = new LinkedHashMap<>();
345 349
             for (String col : columns) {
346 350
                 if (data.containsKey(col)) {
@@ -352,11 +356,6 @@ public class TdEngineService {
352 356
         return filtered;
353 357
     }
354 358
 
355
-    private String extractSuperTableName(String table) {
356
-        int idx = table.lastIndexOf('_');
357
-        return idx > 0 ? table.substring(0, idx) : table;
358
-    }
359
-
360 359
     /**
361 360
      * 收集数据中所有动态列及其类型
362 361
      */
@@ -379,6 +378,31 @@ public class TdEngineService {
379 378
     }
380 379
 
381 380
     /**
381
+     * 收集数据中所有动态列的最大字符串长度(用于动态 VARCHAR 长度)
382
+     */
383
+    private Map<String, Integer> collectColumnMaxLengths(List<Map<String, Object>> dataList) {
384
+        Map<String, Integer> maxLengths = new HashMap<>();
385
+        for (Map<String, Object> data : dataList) {
386
+            if (data == null) {
387
+                continue;
388
+            }
389
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
390
+                String key = entry.getKey();
391
+                if (!isValidFieldName(key) || isReservedColumn(key)) {
392
+                    continue;
393
+                }
394
+                Object value = entry.getValue();
395
+                if (value != null) {
396
+                    int len = value.toString().length();
397
+                    maxLengths.merge(key, len, Math::max);
398
+                }
399
+            }
400
+        }
401
+        log.debug("收集到的列最大长度: {}", maxLengths);
402
+        return maxLengths;
403
+    }
404
+
405
+    /**
382 406
      * 构建批量插入 SQL
383 407
      */
384 408
     private String buildInsertSql(String dbName, String table, String superTableName,
@@ -432,7 +456,7 @@ public class TdEngineService {
432 456
         if (!hasData) {
433 457
             return null;
434 458
         }
435
-        log.info("生成的 INSERT SQL | 列类型: {} | SQL 前100字符: {}", columnTypes, sql.toString().substring(0, Math.min(100, sql.toString().length())));
459
+        log.info("生成的 INSERT SQL | 列类型: {} | SQL 前100字符: {}", columnTypes, sql.substring(0, Math.min(100, sql.toString().length())));
436 460
         sql.setLength(sql.length() - 1);
437 461
         return sql.toString();
438 462
     }
@@ -456,6 +480,8 @@ public class TdEngineService {
456 480
             return;
457 481
         }
458 482
 
483
+        Map<String, Integer> columnMaxLengths = collectColumnMaxLengths(dataList);
484
+
459 485
         log.info("收集到的列类型: {}", columnTypes);
460 486
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
461 487
         log.info("超级表已有列: {}", existingColumns);
@@ -473,7 +499,7 @@ public class TdEngineService {
473 499
             return;
474 500
         }
475 501
 
476
-        ensureColumnsExist(dbName, superTableName, columnTypes);
502
+        ensureColumnsExist(dbName, superTableName, columnTypes, columnMaxLengths);
477 503
 
478 504
         existingColumns = getStableColumns(dbName, superTableName);
479 505
         if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
@@ -564,7 +590,7 @@ public class TdEngineService {
564 590
     /**
565 591
      * 根据 TdEngine 列类型获取创建列的 SQL 类型
566 592
      */
567
-    private String getColumnTypeForDDL(String tdType, String columnName) {
593
+    private String getColumnTypeForDDL(String tdType, Integer maxLen) {
568 594
         switch (tdType) {
569 595
             case "BOOL":
570 596
             case "BIGINT":
@@ -573,7 +599,8 @@ public class TdEngineService {
573 599
                 return tdType;
574 600
             case "VARCHAR":
575 601
             default:
576
-                return "VARCHAR(" + DEFAULT_VARCHAR_LENGTH + ")";
602
+                int len = (maxLen != null && maxLen > 0) ? maxLen + 5 : DEFAULT_VARCHAR_LENGTH;
603
+                return "VARCHAR(" + len + ")";
577 604
         }
578 605
     }
579 606
 
@@ -617,20 +644,16 @@ public class TdEngineService {
617 644
             }
618 645
         }
619 646
 
620
-        // 字符串类型校验长度
621
-        if (strValue.length() > DEFAULT_VARCHAR_LENGTH) {
622
-            log.debug("字段值超长,截断存储 | 列: {} | 值长度: {} | 最大: {} | 截断后: {}...",
623
-                    columnName, strValue.length(), DEFAULT_VARCHAR_LENGTH, strValue.substring(0, DEFAULT_VARCHAR_LENGTH));
624
-            strValue = strValue.substring(0, DEFAULT_VARCHAR_LENGTH);
625
-        }
647
+        // 字符串类型:使用实际值长度 + 5 冗余,不截断
648
+        // (TDengine VARCHAR 实际限制在创建列时已保证充足)
626 649
         return "'" + escapeValue(strValue) + "'";
627 650
     }
628 651
 
629 652
     /**
630 653
      * 确保列存在,如有新列则 ALTER 添加(根据类型添加对应列)
631 654
      */
632
-    private void ensureColumnsExist(String dbName, String superTableName, Map<String, String> columnTypes)
633
-            throws SQLException {
655
+    private void ensureColumnsExist(String dbName, String superTableName, Map<String, String> columnTypes,
656
+                                     Map<String, Integer> columnMaxLengths) throws SQLException {
634 657
 
635 658
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
636 659
 
@@ -646,6 +669,7 @@ public class TdEngineService {
646 669
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
647 670
                 String col = entry.getKey();
648 671
                 String colType = entry.getValue();
672
+                Integer maxLen = columnMaxLengths != null ? columnMaxLengths.get(col) : null;
649 673
 
650 674
                 // 再次检查列数上限
651 675
                 if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
@@ -655,7 +679,7 @@ public class TdEngineService {
655 679
                 }
656 680
 
657 681
                 if (!existingColumns.contains(col)) {
658
-                    String ddlType = getColumnTypeForDDL(colType, col);
682
+                    String ddlType = getColumnTypeForDDL(colType, maxLen);
659 683
                     String alterSql = String.format(
660 684
                             "ALTER STABLE %s.%s ADD COLUMN %s %s",
661 685
                             wrapName(dbName),
@@ -752,6 +776,96 @@ public class TdEngineService {
752 776
         log.info("清除了 TdEngine 超级表结构缓存");
753 777
     }
754 778
 
779
+    /**
780
+     * 根据 controllerId、deviceId 查询 TDengine 最新数据
781
+     * @param controllerId 控制器ID
782
+     * @param deviceId 设备ID
783
+     * @param key 列名,不传则返回所有列
784
+     */
785
+    public Object queryLatestData(String controllerId, String deviceId, String key) throws SQLException {
786
+        // 构建 dbName 前缀
787
+        String dbNamePrefix = "pe_iot_" + controllerId.substring(0, 2);
788
+
789
+        // 遍历 10 个分组查找数据
790
+        for (int groupId = 0; groupId < 10; groupId++) {
791
+            // 表名格式: {deviceId}_{groupId}_{controllerId} (与 insertBatch 保持一致)
792
+            String tableName = deviceId + "_" + groupId + "_" + controllerId;
793
+
794
+            // 检查表是否存在
795
+            if (!isTableExists(dbNamePrefix, tableName)) {
796
+                continue;
797
+            }
798
+
799
+            // 构建查询 SQL
800
+            String colPart = (key != null && !key.trim().isEmpty()) ? wrapName(key) : "*";
801
+            String sql = String.format("SELECT %s FROM %s.%s ORDER BY ts DESC LIMIT 1",
802
+                    colPart, wrapName(dbNamePrefix), wrapName(tableName));
803
+
804
+            try (Connection conn = getConnection();
805
+                 Statement stmt = conn.createStatement();
806
+                 ResultSet rs = stmt.executeQuery(sql)) {
807
+
808
+                if (rs.next()) {
809
+                    if (key != null && !key.trim().isEmpty()) {
810
+                        // 返回指定列的值
811
+                        return rs.getObject(1);
812
+                    } else {
813
+                        // 返回整行数据
814
+                        java.util.Map<String, Object> row = new java.util.LinkedHashMap<>();
815
+                        ResultSetMetaData metaData = rs.getMetaData();
816
+                        int columnCount = metaData.getColumnCount();
817
+                        for (int i = 1; i <= columnCount; i++) {
818
+                            row.put(metaData.getColumnLabel(i), rs.getObject(i));
819
+                        }
820
+                        return row;
821
+                    }
822
+                }
823
+            } catch (SQLException e) {
824
+                // 列不存在,继续查找下一个分组
825
+                if (e.getMessage().contains("Invalid column")) {
826
+                    log.debug("表 {} 中无列 {},继续查找其他分组", tableName, key);
827
+                    continue;
828
+                }
829
+                throw e;
830
+            }
831
+        }
832
+        return null;
833
+    }
834
+
835
+    /**
836
+     * 检查表是否存在
837
+     */
838
+    private boolean isTableExists(String dbName, String tableName) throws SQLException {
839
+        // 先检查数据库是否存在
840
+        String checkDbSql = "SHOW DATABASES";
841
+        try (Connection conn = getConnection();
842
+             Statement stmt = conn.createStatement()) {
843
+            stmt.setQueryTimeout(5);
844
+            boolean dbExists = false;
845
+            try (ResultSet rs = stmt.executeQuery(checkDbSql)) {
846
+                while (rs.next()) {
847
+                    if (dbName.equals(rs.getString(1))) {
848
+                        dbExists = true;
849
+                        break;
850
+                    }
851
+                }
852
+            }
853
+            if (!dbExists) {
854
+                return false;
855
+            }
856
+            // 检查表是否存在
857
+            String sql = String.format("SHOW %s.TABLES", dbName);
858
+            try (ResultSet rs = stmt.executeQuery(sql)) {
859
+                while (rs.next()) {
860
+                    if (tableName.equals(rs.getString(1))) {
861
+                        return true;
862
+                    }
863
+                }
864
+            }
865
+            return false;
866
+        }
867
+    }
868
+
755 869
     public void close() {
756 870
         log.info("关闭 TdEngine 服务...");
757 871
         if (dataSource != null) {

+ 21
- 35
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Näytä tiedosto

@@ -25,32 +25,14 @@ public class VehicleSyncTask {
25 25
 
26 26
     private static final Logger log = LoggerFactory.getLogger(VehicleSyncTask.class);
27 27
 
28
-    private final SysCarService sysCarService;
29
-    private final SysDeviceService sysDeviceService;
30 28
     private final StringRedisTemplate stringRedisTemplate;
31 29
     private final SysRealtimeService sysrealtimeService;
32 30
 
33
-    private final SysDeviceVoService sysDeviceVoService;
34
-    private final SysDeviceControlService sysDeviceControlService;
35
-    private final RestTemplate restTemplate;
36
-    private final IotProperties iotProperties;
37
-
38
-    public VehicleSyncTask(SysCarService sysCarService,
39
-                           SysDeviceService sysDeviceService,
40
-                           StringRedisTemplate stringRedisTemplate,
41
-                           SysRealtimeService sysrealtimeService,
42
-                           SysDeviceVoService sysDeviceVoService,
43
-                           SysDeviceControlService sysDeviceControlService,
44
-                           RestTemplate restTemplate,
45
-                           IotProperties iotProperties) {
46
-        this.sysCarService = sysCarService;
47
-        this.sysDeviceService = sysDeviceService;
31
+    public VehicleSyncTask(StringRedisTemplate stringRedisTemplate,
32
+                           SysRealtimeService sysrealtimeService) {
33
+
48 34
         this.stringRedisTemplate = stringRedisTemplate;
49 35
         this.sysrealtimeService = sysrealtimeService;
50
-        this.sysDeviceVoService = sysDeviceVoService;
51
-        this.sysDeviceControlService = sysDeviceControlService;
52
-        this.restTemplate = restTemplate;
53
-        this.iotProperties = iotProperties;
54 36
     }
55 37
 
56 38
     private boolean tryLock(String lockKey, long expireSeconds) {
@@ -83,53 +65,57 @@ public class VehicleSyncTask {
83 65
     }
84 66
 
85 67
     private void doSyncRedisToMySQL() {
86
-        Set<String> activeKeys = stringRedisTemplate.opsForSet().members("DSB:active:devices");
68
+        Set<String> activeKeys = stringRedisTemplate.opsForSet().members("iot:dsb:active:devices");
87 69
         if (activeKeys == null || activeKeys.isEmpty()) return;
88 70
 
89 71
         for (String redisKey : activeKeys) {
90 72
             try {
91 73
                 Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
92 74
                 if (dataMap == null || dataMap.isEmpty()) {
93
-                    stringRedisTemplate.opsForSet().remove("DSB:active:devices", redisKey);
75
+                    stringRedisTemplate.opsForSet().remove("iot:dsb:active:devices", redisKey);
94 76
                     continue;
95 77
                 }
96 78
 
97
-                String[] parts = redisKey.split(":", 3);
98
-                if (parts.length != 3 || !"DSB".equals(parts[0])) {
99
-                    log.warn("跳过非法 key: {}", redisKey);
79
+                // redisKey 格式: pe_iot_<controllerId>_<deviceId>
80
+                // 从右往左取,避免 id 本身含下划线的问题
81
+                String[] parts = redisKey.split("_");
82
+                if (parts.length < 3) {
83
+                    log.warn("跳过非法 redis key: {}", redisKey);
100 84
                     continue;
101 85
                 }
102
-                String controllerId = parts[1];
86
+                String controllerId = parts[parts.length - 2];
87
+                String deviceId = parts[parts.length - 1];
88
+                String tableName = "pe_iot_" + controllerId.substring(0, 2);
103 89
 
104 90
                 try {
105
-                    sysrealtimeService.createRealtime(controllerId);
91
+                    sysrealtimeService.createRealtime(tableName);
106 92
                 } catch (Exception e) {
107
-                    log.error("创建表失败: {} | {}", controllerId, e.getMessage(), e);
93
+                    log.error("创建表失败: {} | {}", tableName, e.getMessage(), e);
108 94
                     continue;
109 95
                 }
110 96
 
111 97
                 String createTime = getStringValue(dataMap, "createTime");
112 98
                 String timestamp = getStringValue(dataMap, "timestamp");
113
-                String deviceId = getStringValue(dataMap, "device_id");
114
-                if (createTime == null || timestamp == null || deviceId == null) {
99
+
100
+                if (createTime == null || timestamp == null) {
115 101
                     continue;
116 102
                 }
117 103
 
118
-                List<String> existingKeys = sysrealtimeService.selectAllKeys(controllerId);
104
+                List<String> existingKeys = sysrealtimeService.selectAllKeys(tableName, controllerId);
119 105
                 Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
120 106
 
121 107
                 for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
122 108
                     String fieldKey = entry.getKey().toString();
123
-                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey)) {
109
+                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey) || "controller_id".equals(fieldKey)) {
124 110
                         continue;
125 111
                     }
126 112
                     String fieldValue = getStringValue(dataMap, fieldKey);
127 113
                     if (fieldValue == null) continue;
128 114
 
129 115
                     if (existingKeySet.contains(fieldKey)) {
130
-                        sysrealtimeService.updateTables(controllerId, createTime, fieldValue, timestamp, fieldKey, deviceId);
116
+                        sysrealtimeService.updateTables(tableName, createTime, fieldValue, timestamp, fieldKey, controllerId, deviceId);
131 117
                     } else {
132
-                        sysrealtimeService.insertTables(controllerId, createTime, deviceId, timestamp, fieldKey, fieldValue);
118
+                        sysrealtimeService.insertTables(tableName, createTime, controllerId, deviceId, timestamp, fieldKey, fieldValue);
133 119
                     }
134 120
                 }
135 121
             } catch (RedisConnectionFailureException e) {

+ 8
- 4
iot-platform/src/main/resources/mapper/SysRealtimeMapper.xml Näytä tiedosto

@@ -14,10 +14,14 @@
14 14
     <update id="createRealtime">
15 15
         CREATE TABLE IF NOT EXISTS `${tableName}` (
16 16
             create_time VARCHAR(255) NOT NULL COMMENT '时间戳',
17
+            controller_id VARCHAR(255) NOT NULL COMMENT '控制器id',
17 18
             device_id VARCHAR(255) NOT NULL COMMENT '设备id',
18 19
             timestamp VARCHAR(255) NOT NULL COMMENT '时间戳',
19 20
             k VARCHAR(255) NOT NULL COMMENT 'key',
20 21
             v VARCHAR(255) NOT NULL COMMENT '值',
22
+            INDEX idx_controller_id (controller_id),
23
+            INDEX idx_controller_device (controller_id, device_id),
24
+            INDEX idx_controller_k (controller_id, k),
21 25
             INDEX idx_device_id (device_id),
22 26
             INDEX idx_device_create_time (device_id, create_time),
23 27
             INDEX idx_k (k)
@@ -28,17 +32,17 @@
28 32
         SHOW TABLES;
29 33
     </select>
30 34
     <insert id="insertTables">
31
-        insert into `${tableName}`(create_time,device_id,timestamp,k,v) values(#{createTime},#{deviceId},#{timestamp},#{k},#{v})
35
+        insert into `${tableName}`(create_time,controller_id,device_id,timestamp,k,v) values(#{createTime},#{controllerId},#{deviceId},#{timestamp},#{k},#{v})
32 36
     </insert>
33 37
     <update id="updateTables">
34
-        update `${tableName}` set create_time=#{createTime},v=#{v},timestamp=#{timestamp} where k=#{k} and device_id=#{deviceId}
38
+        update `${tableName}` set create_time=#{createTime},v=#{v},timestamp=#{timestamp} where k=#{k} and device_id=#{deviceId} and controller_id=#{controllerId}
35 39
     </update>
36 40
 
37 41
     <select id="selectKey" resultType="Integer">
38
-        select COUNT(*) from `${tableName}` where k=#{k}
42
+        select COUNT(*) from `${tableName}` where k=#{k} and controller_id=#{controllerId}
39 43
     </select>
40 44
 
41 45
     <select id="selectAllKeys" resultType="String">
42
-        SELECT k FROM `${tableName}`
46
+        SELECT k FROM `${tableName}` WHERE controller_id=#{controllerId}
43 47
     </select>
44 48
 </mapper>

Loading…
Peruuta
Tallenna