浏览代码

Merge branch 'master' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511

 Conflicts:
	iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java
mqy20260511
lenovo 2 周前
父节点
当前提交
5c882198d6

+ 55
- 0
iot-platform/src/main/java/com/iot/platform/controller/IotController.java 查看文件

@@ -0,0 +1,55 @@
1
+package com.iot.platform.controller;
2
+
3
+import com.iot.platform.common.AjaxResult;
4
+import com.iot.platform.service.TdEngineService;
5
+import org.springframework.beans.factory.annotation.Autowired;
6
+import org.springframework.data.redis.core.StringRedisTemplate;
7
+import org.springframework.web.bind.annotation.*;
8
+
9
+import java.util.Map;
10
+
11
+@RestController
12
+@RequestMapping("/iot")
13
+public class IotController {
14
+
15
+    @Autowired
16
+    private StringRedisTemplate stringRedisTemplate;
17
+
18
+    @Autowired
19
+    private TdEngineService tdEngineService;
20
+
21
+    /**
22
+     * 根据 controllerId、deviceId 和 key 查询 Redis 中存储的数据
23
+     */
24
+    @GetMapping("/redis/query")
25
+    public AjaxResult queryRedisData(@RequestParam String controllerId,
26
+                                     @RequestParam String deviceId,
27
+                                     @RequestParam(required = false) String key) {
28
+        String redisKey = "pe_iot_" + controllerId + "_" + deviceId;
29
+
30
+        if (key == null || key.trim().isEmpty()) {
31
+            // 查询整个 hash
32
+            Map<Object, Object> data = stringRedisTemplate.opsForHash().entries(redisKey);
33
+            return AjaxResult.success(data);
34
+        } else {
35
+            // 查询指定 field
36
+            Object value = stringRedisTemplate.opsForHash().get(redisKey, key);
37
+            return AjaxResult.success(value);
38
+        }
39
+    }
40
+
41
+    /**
42
+     * 根据 controllerId、deviceId 和 key 查询 TDengine 中存储的数据
43
+     */
44
+    @GetMapping("/tdengine/query")
45
+    public AjaxResult queryTdEngineData(@RequestParam String controllerId,
46
+                                        @RequestParam String deviceId,
47
+                                        @RequestParam(required = false) String key) {
48
+        try {
49
+            Object result = tdEngineService.queryLatestData(controllerId, deviceId, key);
50
+            return AjaxResult.success(result);
51
+        } catch (Exception e) {
52
+            return AjaxResult.error("查询TDengine失败: " + e.getMessage());
53
+        }
54
+    }
55
+}

+ 4
- 4
iot-platform/src/main/java/com/iot/platform/mapper/SysRealtimeMapper.java 查看文件

@@ -9,8 +9,8 @@ import java.util.List;
9 9
 public interface SysRealtimeMapper {
10 10
     void createRealtime(@Param("tableName")String tableName);
11 11
     List<String> selectTables();
12
-    void insertTables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("deviceId")String deviceId,@Param("timestamp")String timestamp,@Param("k")String k,@Param("v")Object v);
13
-    void updateTables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("v")Object v,@Param("timestamp")String timestamp,@Param("k")String k,@Param("deviceId")String deviceId);
14
-    Integer selectKey(@Param("tableName")String tableName,@Param("k")String k);
15
-    List<String> selectAllKeys(@Param("tableName")String tableName);
12
+    void insertTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId, @Param("timestamp")String timestamp, @Param("k")String k, @Param("v")Object v);
13
+    void updateTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("v")Object v, @Param("timestamp")String timestamp, @Param("k")String k, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId);
14
+    Integer selectKey(@Param("tableName")String tableName, @Param("k")String k, @Param("controllerId")String controllerId);
15
+    List<String> selectAllKeys(@Param("tableName")String tableName, @Param("controllerId")String controllerId);
16 16
 }

+ 10
- 16
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java 查看文件

@@ -12,7 +12,6 @@ import org.springframework.context.annotation.DependsOn;
12 12
 import org.springframework.data.redis.core.StringRedisTemplate;
13 13
 import org.springframework.stereotype.Component;
14 14
 
15
-import java.time.LocalDate;
16 15
 import java.time.LocalDateTime;
17 16
 import java.time.format.DateTimeFormatter;
18 17
 import java.util.*;
@@ -106,18 +105,16 @@ public class MqttDynamicConsumer extends AbstractDynamicMqttConsumer {
106 105
         data.put("device_id", ctx.deviceId);
107 106
         List<Map<String, Object>> batch = Collections.singletonList(data);
108 107
 
109
-        String dbNamePrefix = "pe_iot_"+ctx.dbName.substring(0, 2);
110
-        String controllerId = ctx.dbName;
111
-        tdengineService.insertBatch(dbNamePrefix, ctx.superTable, controllerId, batch);
108
+        String dbNamePrefix = "pe_iot_"+ctx.controllerId.substring(0, 2);
109
+        String controllerId = ctx.controllerId;
110
+        tdengineService.insertBatch(dbNamePrefix, ctx.deviceId, controllerId, batch);
112 111
     }
113 112
 
114 113
     private void writeToRedis(MessageContext ctx) {
115
-        String redisKey = "DSB:" + ctx.dbName + ":" + ctx.controllerName;
114
+        String redisKey = "pe_iot_" + ctx.controllerId + "_" + ctx.deviceId;
116 115
 
117 116
         Map<String, String> hashData = new HashMap<>();
118
-        hashData.put("createTime", ctx.currentTime);
119 117
         hashData.put("timestamp", ctx.timestamp);
120
-        hashData.put("device_id", ctx.deviceId);
121 118
 
122 119
         for (Map.Entry<String, Object> entry : ctx.metricData.entrySet()) {
123 120
             if (entry.getValue() != null) {
@@ -126,25 +123,22 @@ public class MqttDynamicConsumer extends AbstractDynamicMqttConsumer {
126 123
         }
127 124
 
128 125
         stringRedisTemplate.opsForHash().putAll(redisKey, hashData);
129
-        stringRedisTemplate.expire(redisKey, 2, TimeUnit.HOURS);
130
-        stringRedisTemplate.opsForSet().add("DSB:active:devices", redisKey);
131
-        stringRedisTemplate.expire("DSB:active:devices", 2, TimeUnit.HOURS);
126
+        stringRedisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
127
+        stringRedisTemplate.opsForSet().add("iot:dsb:active:devices", redisKey);
132 128
     }
133 129
 
134 130
     private static class MessageContext {
135
-        final String dbName;
136
-        final String superTable;
131
+        final String controllerId;
132
+        final String deviceId;
137 133
         final String controllerName;
138 134
         final JSONObject metricData;
139 135
         final String currentTime;
140 136
         final String timestamp;
141
-        final String deviceId;
142 137
 
143
-        MessageContext(String dbName, String superTable, String controllerName,
138
+        MessageContext(String controllerId, String devcieId, String controllerName,
144 139
                        JSONObject metricData, String currentTime,
145 140
                        String timestamp, String deviceId) {
146
-            this.dbName = dbName;
147
-            this.superTable = superTable;
141
+            this.controllerId = controllerId;
148 142
             this.controllerName = controllerName;
149 143
             this.metricData = metricData;
150 144
             this.currentTime = currentTime;

+ 8
- 8
iot-platform/src/main/java/com/iot/platform/service/SysRealtimeService.java 查看文件

@@ -30,24 +30,24 @@ public class SysRealtimeService {
30 30
         return sysRealtimeMapper.selectTables();
31 31
     }
32 32
 
33
-    public void insertTables(String tableName, String createTime, String deviceId, String timestamp, String k, Object v) {
33
+    public void insertTables(String tableName, String createTime, String controllerId, String deviceId, String timestamp, String k, Object v) {
34 34
         validateTableName(tableName);
35
-        sysRealtimeMapper.insertTables(tableName, createTime, deviceId, timestamp, k, v);
35
+        sysRealtimeMapper.insertTables(tableName, createTime, controllerId, deviceId, timestamp, k, v);
36 36
     }
37 37
 
38
-    public void updateTables(String tableName, String createTime, Object v, String timestamp, String k, String deviceId) {
38
+    public void updateTables(String tableName, String createTime, Object v, String timestamp, String k, String controllerId, String deviceId) {
39 39
         validateTableName(tableName);
40
-        sysRealtimeMapper.updateTables(tableName, createTime, v, timestamp, k, deviceId);
40
+        sysRealtimeMapper.updateTables(tableName, createTime, v, timestamp, k, controllerId, deviceId);
41 41
     }
42 42
 
43
-    public Integer selectKey(String tableName, String k) {
43
+    public Integer selectKey(String tableName, String k, String controllerId) {
44 44
         validateTableName(tableName);
45
-        return sysRealtimeMapper.selectKey(tableName, k);
45
+        return sysRealtimeMapper.selectKey(tableName, k, controllerId);
46 46
     }
47 47
 
48
-    public List<String> selectAllKeys(String tableName) {
48
+    public List<String> selectAllKeys(String tableName, String controllerId) {
49 49
         validateTableName(tableName);
50
-        return sysRealtimeMapper.selectAllKeys(tableName);
50
+        return sysRealtimeMapper.selectAllKeys(tableName, controllerId);
51 51
     }
52 52
 
53 53
     private void validateTableName(String tableName) {

+ 140
- 29
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java 查看文件

@@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
8 8
 import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.stereotype.Service;
10 10
 
11
+import java.nio.charset.StandardCharsets;
11 12
 import java.math.BigDecimal;
12 13
 import java.sql.*;
13 14
 import java.time.ZoneOffset;
@@ -48,11 +49,8 @@ public class TdEngineService {
48 49
     // 每批次最大列数(防止 TDengine 行超限)
49 50
     private static final int MAX_COLUMNS_PER_INSERT = 100;
50 51
 
51
-    // 拆分超级表的列数阈值
52
-    private static final int SPLIT_STABLE_COLUMN_THRESHOLD = 100;
53
-
54
-    // 默认 VARCHAR 字段长度限制
55
-    private static final int DEFAULT_VARCHAR_LENGTH = 128;
52
+    // 默认 VARCHAR 字段长度限制(用于未提供实际长度时的兜底)
53
+    private static final int DEFAULT_VARCHAR_LENGTH = 16;
56 54
 
57 55
     // 东八区时区偏移(避免重复创建)
58 56
     private static final ZoneOffset ZONE_OFFSET_8 = ZoneOffset.of("+8");
@@ -292,17 +290,18 @@ public class TdEngineService {
292 290
     private void splitAndInsertToMultipleStables(String dbName, String stableName, String controllerId,
293 291
                                                  List<Map<String, Object>> dataList,
294 292
                                                  Map<String, String> columnTypes) throws SQLException {
293
+
295 294
         // 按首字符 UTF-8 值分组
296
-        Map<Integer, Map<String, String>> groupColumnTypes = new LinkedHashMap<>();
295
+        Map<Integer, Map<String, String>> groupColumnTypeMap = new LinkedHashMap<>();
297 296
         for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
298 297
             int groupId = getFirstCharGroupId(entry.getKey());
299
-            groupColumnTypes.computeIfAbsent(groupId, k -> new LinkedHashMap<>()).put(entry.getKey(), entry.getValue());
298
+            groupColumnTypeMap.computeIfAbsent(groupId, k -> new LinkedHashMap<>()).put(entry.getKey(), entry.getValue());
300 299
         }
301 300
 
302
-        log.info("拆分后超级表数量: {} | 分组: {}", groupColumnTypes.size(), groupColumnTypes.keySet());
301
+        log.info("拆分后超级表数量: {} | 分组: {}", groupColumnTypeMap.size(), groupColumnTypeMap.keySet());
303 302
 
304 303
         // 每个分组作为一个超级表,处理其对应的列和数据
305
-        for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypes.entrySet()) {
304
+        for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypeMap.entrySet()) {
306 305
             int groupId = group.getKey();
307 306
             // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
308 307
             String newStableName = stableName+"_"+groupId;
@@ -330,7 +329,7 @@ public class TdEngineService {
330 329
             return 0;
331 330
         }
332 331
         String firstChar = columnName.substring(0, 1);
333
-        byte[] bytes = firstChar.getBytes(java.nio.charset.StandardCharsets.UTF_8);
332
+        byte[] bytes = firstChar.getBytes(StandardCharsets.UTF_8);
334 333
         return (bytes[0] & 0xFF) % 10;
335 334
     }
336 335
 
@@ -340,7 +339,9 @@ public class TdEngineService {
340 339
     private List<Map<String, Object>> filterDataByColumnGroup(List<Map<String, Object>> dataList, Set<String> columns) {
341 340
         List<Map<String, Object>> filtered = new ArrayList<>();
342 341
         for (Map<String, Object> data : dataList) {
343
-            if (data == null) continue;
342
+            if (data == null) {
343
+                continue;
344
+            }
344 345
             Map<String, Object> filteredRow = new LinkedHashMap<>();
345 346
             for (String col : columns) {
346 347
                 if (data.containsKey(col)) {
@@ -352,11 +353,6 @@ public class TdEngineService {
352 353
         return filtered;
353 354
     }
354 355
 
355
-    private String extractSuperTableName(String table) {
356
-        int idx = table.lastIndexOf('_');
357
-        return idx > 0 ? table.substring(0, idx) : table;
358
-    }
359
-
360 356
     /**
361 357
      * 收集数据中所有动态列及其类型
362 358
      */
@@ -379,6 +375,31 @@ public class TdEngineService {
379 375
     }
380 376
 
381 377
     /**
378
+     * 收集数据中所有动态列的最大字符串长度(用于动态 VARCHAR 长度)
379
+     */
380
+    private Map<String, Integer> collectColumnMaxLengths(List<Map<String, Object>> dataList) {
381
+        Map<String, Integer> maxLengths = new HashMap<>();
382
+        for (Map<String, Object> data : dataList) {
383
+            if (data == null) {
384
+                continue;
385
+            }
386
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
387
+                String key = entry.getKey();
388
+                if (!isValidFieldName(key) || isReservedColumn(key)) {
389
+                    continue;
390
+                }
391
+                Object value = entry.getValue();
392
+                if (value != null) {
393
+                    int len = value.toString().length();
394
+                    maxLengths.merge(key, len, Math::max);
395
+                }
396
+            }
397
+        }
398
+        log.debug("收集到的列最大长度: {}", maxLengths);
399
+        return maxLengths;
400
+    }
401
+
402
+    /**
382 403
      * 构建批量插入 SQL
383 404
      */
384 405
     private String buildInsertSql(String dbName, String table, String superTableName,
@@ -432,7 +453,7 @@ public class TdEngineService {
432 453
         if (!hasData) {
433 454
             return null;
434 455
         }
435
-        log.info("生成的 INSERT SQL | 列类型: {} | SQL 前100字符: {}", columnTypes, sql.toString().substring(0, Math.min(100, sql.toString().length())));
456
+        log.info("生成的 INSERT SQL | 列类型: {} | SQL 前100字符: {}", columnTypes, sql.substring(0, Math.min(100, sql.toString().length())));
436 457
         sql.setLength(sql.length() - 1);
437 458
         return sql.toString();
438 459
     }
@@ -456,6 +477,8 @@ public class TdEngineService {
456 477
             return;
457 478
         }
458 479
 
480
+        Map<String, Integer> columnMaxLengths = collectColumnMaxLengths(dataList);
481
+
459 482
         log.info("收集到的列类型: {}", columnTypes);
460 483
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
461 484
         log.info("超级表已有列: {}", existingColumns);
@@ -473,7 +496,7 @@ public class TdEngineService {
473 496
             return;
474 497
         }
475 498
 
476
-        ensureColumnsExist(dbName, superTableName, columnTypes);
499
+        ensureColumnsExist(dbName, superTableName, columnTypes, columnMaxLengths);
477 500
 
478 501
         existingColumns = getStableColumns(dbName, superTableName);
479 502
         if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
@@ -564,7 +587,7 @@ public class TdEngineService {
564 587
     /**
565 588
      * 根据 TdEngine 列类型获取创建列的 SQL 类型
566 589
      */
567
-    private String getColumnTypeForDDL(String tdType, String columnName) {
590
+    private String getColumnTypeForDDL(String tdType, Integer maxLen) {
568 591
         switch (tdType) {
569 592
             case "BOOL":
570 593
             case "BIGINT":
@@ -573,7 +596,8 @@ public class TdEngineService {
573 596
                 return tdType;
574 597
             case "VARCHAR":
575 598
             default:
576
-                return "VARCHAR(" + DEFAULT_VARCHAR_LENGTH + ")";
599
+                int len = (maxLen != null && maxLen > 0) ? maxLen + 5 : DEFAULT_VARCHAR_LENGTH;
600
+                return "VARCHAR(" + len + ")";
577 601
         }
578 602
     }
579 603
 
@@ -617,20 +641,16 @@ public class TdEngineService {
617 641
             }
618 642
         }
619 643
 
620
-        // 字符串类型校验长度
621
-        if (strValue.length() > DEFAULT_VARCHAR_LENGTH) {
622
-            log.debug("字段值超长,截断存储 | 列: {} | 值长度: {} | 最大: {} | 截断后: {}...",
623
-                    columnName, strValue.length(), DEFAULT_VARCHAR_LENGTH, strValue.substring(0, DEFAULT_VARCHAR_LENGTH));
624
-            strValue = strValue.substring(0, DEFAULT_VARCHAR_LENGTH);
625
-        }
644
+        // 字符串类型:使用实际值长度 + 5 冗余,不截断
645
+        // (TDengine VARCHAR 实际限制在创建列时已保证充足)
626 646
         return "'" + escapeValue(strValue) + "'";
627 647
     }
628 648
 
629 649
     /**
630 650
      * 确保列存在,如有新列则 ALTER 添加(根据类型添加对应列)
631 651
      */
632
-    private void ensureColumnsExist(String dbName, String superTableName, Map<String, String> columnTypes)
633
-            throws SQLException {
652
+    private void ensureColumnsExist(String dbName, String superTableName, Map<String, String> columnTypes,
653
+                                     Map<String, Integer> columnMaxLengths) throws SQLException {
634 654
 
635 655
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
636 656
 
@@ -646,6 +666,7 @@ public class TdEngineService {
646 666
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
647 667
                 String col = entry.getKey();
648 668
                 String colType = entry.getValue();
669
+                Integer maxLen = columnMaxLengths != null ? columnMaxLengths.get(col) : null;
649 670
 
650 671
                 // 再次检查列数上限
651 672
                 if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
@@ -655,7 +676,7 @@ public class TdEngineService {
655 676
                 }
656 677
 
657 678
                 if (!existingColumns.contains(col)) {
658
-                    String ddlType = getColumnTypeForDDL(colType, col);
679
+                    String ddlType = getColumnTypeForDDL(colType, maxLen);
659 680
                     String alterSql = String.format(
660 681
                             "ALTER STABLE %s.%s ADD COLUMN %s %s",
661 682
                             wrapName(dbName),
@@ -752,6 +773,96 @@ public class TdEngineService {
752 773
         log.info("清除了 TdEngine 超级表结构缓存");
753 774
     }
754 775
 
776
+    /**
777
+     * 根据 controllerId、deviceId 查询 TDengine 最新数据
778
+     * @param controllerId 控制器ID
779
+     * @param deviceId 设备ID
780
+     * @param key 列名,不传则返回所有列
781
+     */
782
+    public Object queryLatestData(String controllerId, String deviceId, String key) throws SQLException {
783
+        // 构建 dbName 前缀
784
+        String dbNamePrefix = "pe_iot_" + controllerId.substring(0, 2);
785
+
786
+        // 遍历 10 个分组查找数据
787
+        for (int groupId = 0; groupId < 10; groupId++) {
788
+            // 表名格式: {controllerId}_{groupId}_{deviceId}
789
+            String tableName = controllerId + "_" + groupId + "_" + deviceId;
790
+
791
+            // 检查表是否存在
792
+            if (!isTableExists(dbNamePrefix, tableName)) {
793
+                continue;
794
+            }
795
+
796
+            // 构建查询 SQL
797
+            String colPart = (key != null && !key.trim().isEmpty()) ? wrapName(key) : "*";
798
+            String sql = String.format("SELECT %s FROM %s.%s ORDER BY ts DESC LIMIT 1",
799
+                    colPart, wrapName(dbNamePrefix), wrapName(tableName));
800
+
801
+            try (Connection conn = getConnection();
802
+                 Statement stmt = conn.createStatement();
803
+                 ResultSet rs = stmt.executeQuery(sql)) {
804
+
805
+                if (rs.next()) {
806
+                    if (key != null && !key.trim().isEmpty()) {
807
+                        // 返回指定列的值
808
+                        return rs.getObject(1);
809
+                    } else {
810
+                        // 返回整行数据
811
+                        java.util.Map<String, Object> row = new java.util.LinkedHashMap<>();
812
+                        ResultSetMetaData metaData = rs.getMetaData();
813
+                        int columnCount = metaData.getColumnCount();
814
+                        for (int i = 1; i <= columnCount; i++) {
815
+                            row.put(metaData.getColumnLabel(i), rs.getObject(i));
816
+                        }
817
+                        return row;
818
+                    }
819
+                }
820
+            } catch (SQLException e) {
821
+                // 列不存在,继续查找下一个分组
822
+                if (e.getMessage().contains("Invalid column")) {
823
+                    log.debug("表 {} 中无列 {},继续查找其他分组", tableName, key);
824
+                    continue;
825
+                }
826
+                throw e;
827
+            }
828
+        }
829
+        return null;
830
+    }
831
+
832
+    /**
833
+     * 检查表是否存在
834
+     */
835
+    private boolean isTableExists(String dbName, String tableName) throws SQLException {
836
+        // 先检查数据库是否存在
837
+        String checkDbSql = "SHOW DATABASES";
838
+        try (Connection conn = getConnection();
839
+             Statement stmt = conn.createStatement()) {
840
+            stmt.setQueryTimeout(5);
841
+            boolean dbExists = false;
842
+            try (ResultSet rs = stmt.executeQuery(checkDbSql)) {
843
+                while (rs.next()) {
844
+                    if (dbName.equals(rs.getString(1))) {
845
+                        dbExists = true;
846
+                        break;
847
+                    }
848
+                }
849
+            }
850
+            if (!dbExists) {
851
+                return false;
852
+            }
853
+            // 检查表是否存在
854
+            String sql = String.format("SHOW %s.TABLES", dbName);
855
+            try (ResultSet rs = stmt.executeQuery(sql)) {
856
+                while (rs.next()) {
857
+                    if (tableName.equals(rs.getString(1))) {
858
+                        return true;
859
+                    }
860
+                }
861
+            }
862
+            return false;
863
+        }
864
+    }
865
+
755 866
     public void close() {
756 867
         log.info("关闭 TdEngine 服务...");
757 868
         if (dataSource != null) {

+ 19
- 34
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java 查看文件

@@ -25,32 +25,14 @@ public class VehicleSyncTask {
25 25
 
26 26
     private static final Logger log = LoggerFactory.getLogger(VehicleSyncTask.class);
27 27
 
28
-    private final SysCarService sysCarService;
29
-    private final SysDeviceService sysDeviceService;
30 28
     private final StringRedisTemplate stringRedisTemplate;
31 29
     private final SysRealtimeService sysrealtimeService;
32 30
 
33
-    private final SysDeviceVoService sysDeviceVoService;
34
-    private final SysDeviceControlService sysDeviceControlService;
35
-    private final RestTemplate restTemplate;
36
-    private final IotProperties iotProperties;
37
-
38
-    public VehicleSyncTask(SysCarService sysCarService,
39
-                           SysDeviceService sysDeviceService,
40
-                           StringRedisTemplate stringRedisTemplate,
41
-                           SysRealtimeService sysrealtimeService,
42
-                           SysDeviceVoService sysDeviceVoService,
43
-                           SysDeviceControlService sysDeviceControlService,
44
-                           RestTemplate restTemplate,
45
-                           IotProperties iotProperties) {
46
-        this.sysCarService = sysCarService;
47
-        this.sysDeviceService = sysDeviceService;
31
+    public VehicleSyncTask(StringRedisTemplate stringRedisTemplate,
32
+                           SysRealtimeService sysrealtimeService) {
33
+
48 34
         this.stringRedisTemplate = stringRedisTemplate;
49 35
         this.sysrealtimeService = sysrealtimeService;
50
-        this.sysDeviceVoService = sysDeviceVoService;
51
-        this.sysDeviceControlService = sysDeviceControlService;
52
-        this.restTemplate = restTemplate;
53
-        this.iotProperties = iotProperties;
54 36
     }
55 37
 
56 38
     private boolean tryLock(String lockKey, long expireSeconds) {
@@ -83,53 +65,56 @@ public class VehicleSyncTask {
83 65
     }
84 66
 
85 67
     private void doSyncRedisToMySQL() {
86
-        Set<String> activeKeys = stringRedisTemplate.opsForSet().members("DSB:active:devices");
68
+        Set<String> activeKeys = stringRedisTemplate.opsForSet().members("iot:dsb:active:devices");
87 69
         if (activeKeys == null || activeKeys.isEmpty()) return;
88 70
 
89 71
         for (String redisKey : activeKeys) {
90 72
             try {
91 73
                 Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
92 74
                 if (dataMap == null || dataMap.isEmpty()) {
93
-                    stringRedisTemplate.opsForSet().remove("DSB:active:devices", redisKey);
75
+                    stringRedisTemplate.opsForSet().remove("iot:dsb:active:devices", redisKey);
94 76
                     continue;
95 77
                 }
96 78
 
97
-                String[] parts = redisKey.split(":", 3);
98
-                if (parts.length != 3 || !"DSB".equals(parts[0])) {
99
-                    log.warn("跳过非法 key: {}", redisKey);
79
+                // redisKey 格式: pe_iot_<controllerId>_<deviceId>
80
+                String[] parts = redisKey.split("_");
81
+                if (parts.length < 3) {
82
+                    log.warn("跳过非法 redis key: {}", redisKey);
100 83
                     continue;
101 84
                 }
102 85
                 String controllerId = parts[1];
86
+                String deviceId = parts[2];
87
+                String tableName = "pe_iot_" + controllerId.substring(0, 2);
103 88
 
104 89
                 try {
105
-                    sysrealtimeService.createRealtime(controllerId);
90
+                    sysrealtimeService.createRealtime(tableName);
106 91
                 } catch (Exception e) {
107
-                    log.error("创建表失败: {} | {}", controllerId, e.getMessage(), e);
92
+                    log.error("创建表失败: {} | {}", tableName, e.getMessage(), e);
108 93
                     continue;
109 94
                 }
110 95
 
111 96
                 String createTime = getStringValue(dataMap, "createTime");
112 97
                 String timestamp = getStringValue(dataMap, "timestamp");
113
-                String deviceId = getStringValue(dataMap, "device_id");
114
-                if (createTime == null || timestamp == null || deviceId == null) {
98
+
99
+                if (createTime == null || timestamp == null) {
115 100
                     continue;
116 101
                 }
117 102
 
118
-                List<String> existingKeys = sysrealtimeService.selectAllKeys(controllerId);
103
+                List<String> existingKeys = sysrealtimeService.selectAllKeys(tableName, controllerId);
119 104
                 Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
120 105
 
121 106
                 for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
122 107
                     String fieldKey = entry.getKey().toString();
123
-                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey)) {
108
+                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey) || "controller_id".equals(fieldKey)) {
124 109
                         continue;
125 110
                     }
126 111
                     String fieldValue = getStringValue(dataMap, fieldKey);
127 112
                     if (fieldValue == null) continue;
128 113
 
129 114
                     if (existingKeySet.contains(fieldKey)) {
130
-                        sysrealtimeService.updateTables(controllerId, createTime, fieldValue, timestamp, fieldKey, deviceId);
115
+                        sysrealtimeService.updateTables(tableName, createTime, fieldValue, timestamp, fieldKey, controllerId, deviceId);
131 116
                     } else {
132
-                        sysrealtimeService.insertTables(controllerId, createTime, deviceId, timestamp, fieldKey, fieldValue);
117
+                        sysrealtimeService.insertTables(tableName, createTime, controllerId, deviceId, timestamp, fieldKey, fieldValue);
133 118
                     }
134 119
                 }
135 120
             } catch (RedisConnectionFailureException e) {

+ 8
- 4
iot-platform/src/main/resources/mapper/SysRealtimeMapper.xml 查看文件

@@ -14,10 +14,14 @@
14 14
     <update id="createRealtime">
15 15
         CREATE TABLE IF NOT EXISTS `${tableName}` (
16 16
             create_time VARCHAR(255) NOT NULL COMMENT '时间戳',
17
+            controller_id VARCHAR(255) NOT NULL COMMENT '控制器id',
17 18
             device_id VARCHAR(255) NOT NULL COMMENT '设备id',
18 19
             timestamp VARCHAR(255) NOT NULL COMMENT '时间戳',
19 20
             k VARCHAR(255) NOT NULL COMMENT 'key',
20 21
             v VARCHAR(255) NOT NULL COMMENT '值',
22
+            INDEX idx_controller_id (controller_id),
23
+            INDEX idx_controller_device (controller_id, device_id),
24
+            INDEX idx_controller_k (controller_id, k),
21 25
             INDEX idx_device_id (device_id),
22 26
             INDEX idx_device_create_time (device_id, create_time),
23 27
             INDEX idx_k (k)
@@ -28,17 +32,17 @@
28 32
         SHOW TABLES;
29 33
     </select>
30 34
     <insert id="insertTables">
31
-        insert into `${tableName}`(create_time,device_id,timestamp,k,v) values(#{createTime},#{deviceId},#{timestamp},#{k},#{v})
35
+        insert into `${tableName}`(create_time,controller_id,device_id,timestamp,k,v) values(#{createTime},#{controllerId},#{deviceId},#{timestamp},#{k},#{v})
32 36
     </insert>
33 37
     <update id="updateTables">
34
-        update `${tableName}` set create_time=#{createTime},v=#{v},timestamp=#{timestamp} where k=#{k} and device_id=#{deviceId}
38
+        update `${tableName}` set create_time=#{createTime},v=#{v},timestamp=#{timestamp} where k=#{k} and device_id=#{deviceId} and controller_id=#{controllerId}
35 39
     </update>
36 40
 
37 41
     <select id="selectKey" resultType="Integer">
38
-        select COUNT(*) from `${tableName}` where k=#{k}
42
+        select COUNT(*) from `${tableName}` where k=#{k} and controller_id=#{controllerId}
39 43
     </select>
40 44
 
41 45
     <select id="selectAllKeys" resultType="String">
42
-        SELECT k FROM `${tableName}`
46
+        SELECT k FROM `${tableName}` WHERE controller_id=#{controllerId}
43 47
     </select>
44 48
 </mapper>

正在加载...
取消
保存