Просмотр исходного кода

Merge remote-tracking branch 'origin/mqy20260511'

mqy20260511
humanleft 2 недель назад
Родитель
Сommit
d3c6084a83

+ 55
- 0
iot-platform/src/main/java/com/iot/platform/controller/IotController.java Просмотреть файл

1
+package com.iot.platform.controller;
2
+
3
+import com.iot.platform.common.AjaxResult;
4
+import com.iot.platform.service.TdEngineService;
5
+import org.springframework.beans.factory.annotation.Autowired;
6
+import org.springframework.data.redis.core.StringRedisTemplate;
7
+import org.springframework.web.bind.annotation.*;
8
+
9
+import java.util.Map;
10
+
11
+@RestController
12
+@RequestMapping("/iot")
13
+public class IotController {
14
+
15
+    @Autowired
16
+    private StringRedisTemplate stringRedisTemplate;
17
+
18
+    @Autowired
19
+    private TdEngineService tdEngineService;
20
+
21
+    /**
22
+     * 根据 controllerId、deviceId 和 key 查询 Redis 中存储的数据
23
+     */
24
+    @GetMapping("/redis/query")
25
+    public AjaxResult queryRedisData(@RequestParam String controllerId,
26
+                                     @RequestParam String deviceId,
27
+                                     @RequestParam(required = false) String key) {
28
+        String redisKey = "pe_iot_" + controllerId + "_" + deviceId;
29
+
30
+        if (key == null || key.trim().isEmpty()) {
31
+            // 查询整个 hash
32
+            Map<Object, Object> data = stringRedisTemplate.opsForHash().entries(redisKey);
33
+            return AjaxResult.success(data);
34
+        } else {
35
+            // 查询指定 field
36
+            Object value = stringRedisTemplate.opsForHash().get(redisKey, key);
37
+            return AjaxResult.success(value);
38
+        }
39
+    }
40
+
41
+    /**
42
+     * 根据 controllerId、deviceId 和 key 查询 TDengine 中存储的数据
43
+     */
44
+    @GetMapping("/tdengine/query")
45
+    public AjaxResult queryTdEngineData(@RequestParam String controllerId,
46
+                                        @RequestParam String deviceId,
47
+                                        @RequestParam(required = false) String key) {
48
+        try {
49
+            Object result = tdEngineService.queryLatestData(controllerId, deviceId, key);
50
+            return AjaxResult.success(result);
51
+        } catch (Exception e) {
52
+            return AjaxResult.error("查询TDengine失败: " + e.getMessage());
53
+        }
54
+    }
55
+}

+ 4
- 4
iot-platform/src/main/java/com/iot/platform/mapper/SysRealtimeMapper.java Просмотреть файл

9
 public interface SysRealtimeMapper {
9
 public interface SysRealtimeMapper {
10
     void createRealtime(@Param("tableName")String tableName);
10
     void createRealtime(@Param("tableName")String tableName);
11
     List<String> selectTables();
11
     List<String> selectTables();
12
-    void insertTables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("deviceId")String deviceId,@Param("timestamp")String timestamp,@Param("k")String k,@Param("v")Object v);
13
-    void updateTables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("v")Object v,@Param("timestamp")String timestamp,@Param("k")String k,@Param("deviceId")String deviceId);
14
-    Integer selectKey(@Param("tableName")String tableName,@Param("k")String k);
15
-    List<String> selectAllKeys(@Param("tableName")String tableName);
12
+    void insertTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId, @Param("timestamp")String timestamp, @Param("k")String k, @Param("v")Object v);
13
+    void updateTables(@Param("tableName")String tableName, @Param("createTime")String createTime, @Param("v")Object v, @Param("timestamp")String timestamp, @Param("k")String k, @Param("controllerId")String controllerId, @Param("deviceId")String deviceId);
14
+    Integer selectKey(@Param("tableName")String tableName, @Param("k")String k, @Param("controllerId")String controllerId);
15
+    List<String> selectAllKeys(@Param("tableName")String tableName, @Param("controllerId")String controllerId);
16
 }
16
 }

+ 1
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Просмотреть файл

79
 //        superTable + "_" +
79
 //        superTable + "_" +
80
 //        int i = ((int) "g".getBytes("UTF-8")[0]) % 10;
80
 //        int i = ((int) "g".getBytes("UTF-8")[0]) % 10;
81
 
81
 
82
-        tdengineService.insertBatch(dbNamePrefix, superTable,tableName, batchToInsert);
82
+        tdengineService.insertBatch(dbNamePrefix, superTable, tableName, batchToInsert);
83
     }
83
     }
84
 }
84
 }

+ 10
- 15
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Просмотреть файл

105
         data.put("device_id", ctx.deviceId);
105
         data.put("device_id", ctx.deviceId);
106
         List<Map<String, Object>> batch = Collections.singletonList(data);
106
         List<Map<String, Object>> batch = Collections.singletonList(data);
107
 
107
 
108
-        String dbNamePrefix = "pe_iot_"+ctx.dbName.substring(0, 2);
109
-        String tableName = ctx.dbName;
110
-        tdengineService.insertBatch(dbNamePrefix, ctx.superTable,tableName, batch);
108
+        String dbNamePrefix = "pe_iot_"+ctx.controllerId.substring(0, 2);
109
+        String controllerId = ctx.controllerId;
110
+        tdengineService.insertBatch(dbNamePrefix, ctx.deviceId, controllerId, batch);
111
     }
111
     }
112
 
112
 
113
     private void writeToRedis(MessageContext ctx) {
113
     private void writeToRedis(MessageContext ctx) {
114
-        String redisKey = "DSB:" + ctx.dbName + ":" + ctx.controllerName;
114
+        String redisKey = "pe_iot_" + ctx.controllerId + "_" + ctx.deviceId;
115
 
115
 
116
         Map<String, String> hashData = new HashMap<>();
116
         Map<String, String> hashData = new HashMap<>();
117
-        hashData.put("createTime", ctx.currentTime);
118
         hashData.put("timestamp", ctx.timestamp);
117
         hashData.put("timestamp", ctx.timestamp);
119
-        hashData.put("device_id", ctx.deviceId);
120
 
118
 
121
         for (Map.Entry<String, Object> entry : ctx.metricData.entrySet()) {
119
         for (Map.Entry<String, Object> entry : ctx.metricData.entrySet()) {
122
             if (entry.getValue() != null) {
120
             if (entry.getValue() != null) {
125
         }
123
         }
126
 
124
 
127
         stringRedisTemplate.opsForHash().putAll(redisKey, hashData);
125
         stringRedisTemplate.opsForHash().putAll(redisKey, hashData);
128
-        stringRedisTemplate.expire(redisKey, 2, TimeUnit.HOURS);
129
-        stringRedisTemplate.opsForSet().add("DSB:active:devices", redisKey);
130
-        stringRedisTemplate.expire("DSB:active:devices", 2, TimeUnit.HOURS);
126
+        stringRedisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
127
+        stringRedisTemplate.opsForSet().add("iot:dsb:active:devices", redisKey);
131
     }
128
     }
132
 
129
 
133
     private static class MessageContext {
130
     private static class MessageContext {
134
-        final String dbName;
135
-        final String superTable;
131
+        final String controllerId;
132
+        final String deviceId;
136
         final String controllerName;
133
         final String controllerName;
137
         final JSONObject metricData;
134
         final JSONObject metricData;
138
         final String currentTime;
135
         final String currentTime;
139
         final String timestamp;
136
         final String timestamp;
140
-        final String deviceId;
141
 
137
 
142
-        MessageContext(String dbName, String superTable, String controllerName,
138
+        MessageContext(String controllerId, String devcieId, String controllerName,
143
                        JSONObject metricData, String currentTime,
139
                        JSONObject metricData, String currentTime,
144
                        String timestamp, String deviceId) {
140
                        String timestamp, String deviceId) {
145
-            this.dbName = dbName;
146
-            this.superTable = superTable;
141
+            this.controllerId = controllerId;
147
             this.controllerName = controllerName;
142
             this.controllerName = controllerName;
148
             this.metricData = metricData;
143
             this.metricData = metricData;
149
             this.currentTime = currentTime;
144
             this.currentTime = currentTime;

+ 8
- 8
iot-platform/src/main/java/com/iot/platform/service/SysRealtimeService.java Просмотреть файл

30
         return sysRealtimeMapper.selectTables();
30
         return sysRealtimeMapper.selectTables();
31
     }
31
     }
32
 
32
 
33
-    public void insertTables(String tableName, String createTime, String deviceId, String timestamp, String k, Object v) {
33
+    public void insertTables(String tableName, String createTime, String controllerId, String deviceId, String timestamp, String k, Object v) {
34
         validateTableName(tableName);
34
         validateTableName(tableName);
35
-        sysRealtimeMapper.insertTables(tableName, createTime, deviceId, timestamp, k, v);
35
+        sysRealtimeMapper.insertTables(tableName, createTime, controllerId, deviceId, timestamp, k, v);
36
     }
36
     }
37
 
37
 
38
-    public void updateTables(String tableName, String createTime, Object v, String timestamp, String k, String deviceId) {
38
+    public void updateTables(String tableName, String createTime, Object v, String timestamp, String k, String controllerId, String deviceId) {
39
         validateTableName(tableName);
39
         validateTableName(tableName);
40
-        sysRealtimeMapper.updateTables(tableName, createTime, v, timestamp, k, deviceId);
40
+        sysRealtimeMapper.updateTables(tableName, createTime, v, timestamp, k, controllerId, deviceId);
41
     }
41
     }
42
 
42
 
43
-    public Integer selectKey(String tableName, String k) {
43
+    public Integer selectKey(String tableName, String k, String controllerId) {
44
         validateTableName(tableName);
44
         validateTableName(tableName);
45
-        return sysRealtimeMapper.selectKey(tableName, k);
45
+        return sysRealtimeMapper.selectKey(tableName, k, controllerId);
46
     }
46
     }
47
 
47
 
48
-    public List<String> selectAllKeys(String tableName) {
48
+    public List<String> selectAllKeys(String tableName, String controllerId) {
49
         validateTableName(tableName);
49
         validateTableName(tableName);
50
-        return sysRealtimeMapper.selectAllKeys(tableName);
50
+        return sysRealtimeMapper.selectAllKeys(tableName, controllerId);
51
     }
51
     }
52
 
52
 
53
     private void validateTableName(String tableName) {
53
     private void validateTableName(String tableName) {

+ 166
- 47
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Просмотреть файл

9
 import org.springframework.stereotype.Service;
9
 import org.springframework.stereotype.Service;
10
 
10
 
11
 import java.nio.charset.StandardCharsets;
11
 import java.nio.charset.StandardCharsets;
12
+import java.math.BigDecimal;
12
 import java.sql.*;
13
 import java.sql.*;
13
 import java.time.ZoneOffset;
14
 import java.time.ZoneOffset;
14
 import java.util.*;
15
 import java.util.*;
48
     // 每批次最大列数(防止 TDengine 行超限)
49
     // 每批次最大列数(防止 TDengine 行超限)
49
     private static final int MAX_COLUMNS_PER_INSERT = 100;
50
     private static final int MAX_COLUMNS_PER_INSERT = 100;
50
 
51
 
51
-    // 默认 VARCHAR 字段长度限制
52
-    private static final int DEFAULT_VARCHAR_LENGTH = 128;
52
+    // 默认 VARCHAR 字段长度限制(用于未提供实际长度时的兜底)
53
+    private static final int DEFAULT_VARCHAR_LENGTH = 16;
54
+
55
+    // TDengine 数据保留天数(超过此天数的数据自动删除)
56
+    private static final int DATA_RETENTION_DAYS = 180;
53
 
57
 
54
     // 东八区时区偏移(避免重复创建)
58
     // 东八区时区偏移(避免重复创建)
55
     private static final ZoneOffset ZONE_OFFSET_8 = ZoneOffset.of("+8");
59
     private static final ZoneOffset ZONE_OFFSET_8 = ZoneOffset.of("+8");
126
         if (name == null || name.isEmpty()) {
130
         if (name == null || name.isEmpty()) {
127
             return "`unknown`";
131
             return "`unknown`";
128
         }
132
         }
129
-        return "`" + name.replace("`", "") + "`";
133
+        return "`" + name.replaceAll("`", "") + "`";
130
     }
134
     }
131
 
135
 
132
     private boolean isValidFieldName(String name) {
136
     private boolean isValidFieldName(String name) {
228
     // ==========================================
232
     // ==========================================
229
     // 初始化表结构(按列存储,无 ext_data)
233
     // 初始化表结构(按列存储,无 ext_data)
230
     // ==========================================
234
     // ==========================================
231
-    private void initTableStructure(String dbName, String superTableName, String table)
235
+    private void initTableStructure(String dbName, String superTableName, String table, String controllerId)
232
             throws SQLException {
236
             throws SQLException {
233
 
237
 
234
         try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
238
         try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
235
             stmt.setQueryTimeout(10);
239
             stmt.setQueryTimeout(10);
236
 
240
 
237
-            stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName));
241
+            stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName) + " KEEP " + DATA_RETENTION_DAYS);
238
 
242
 
239
-            // 创建超级表:固定 ts + surfacename,无 ext_data 列
243
+            // 创建超级表:固定 ts + controller_id 列(TDengine 要求至少 2 列)
240
             String stableSql = String.format(
244
             String stableSql = String.format(
241
-                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location VARCHAR(255))",
245
+                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, controller_id BIGINT) TAGS (location VARCHAR(64))",
242
                     wrapName(dbName),
246
                     wrapName(dbName),
243
                     wrapName(superTableName)
247
                     wrapName(superTableName)
244
             );
248
             );
248
             String key = getStableKey(dbName, superTableName);
252
             String key = getStableKey(dbName, superTableName);
249
             Set<String> fixedCols = new HashSet<>();
253
             Set<String> fixedCols = new HashSet<>();
250
             fixedCols.add("ts");
254
             fixedCols.add("ts");
251
-            fixedCols.add("surfacename");
255
+            fixedCols.add("controller_id");
252
             stableColumnCache.put(key, fixedCols);
256
             stableColumnCache.put(key, fixedCols);
253
 
257
 
254
-            // 创建子表
258
+            // 创建子表,使用 controllerId 作为 tag
255
             String tableSql = String.format(
259
             String tableSql = String.format(
256
                     "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
260
                     "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
257
                     wrapName(dbName),
261
                     wrapName(dbName),
258
                     wrapName(table),
262
                     wrapName(table),
259
                     wrapName(dbName),
263
                     wrapName(dbName),
260
                     wrapName(superTableName),
264
                     wrapName(superTableName),
261
-                    escapeValue(superTableName)
265
+                    escapeValue(controllerId)
262
             );
266
             );
263
             stmt.executeUpdate(tableSql);
267
             stmt.executeUpdate(tableSql);
264
         }
268
         }
267
     // ==========================================
271
     // ==========================================
268
     // 批量插入(按列存储)
272
     // 批量插入(按列存储)
269
     // ==========================================
273
     // ==========================================
270
-    public void insertBatch(String dbName, String superTable, String table, List<Map<String, Object>> dataList)
274
+    public void insertBatch(String dbName, String superTable, String controllerId, List<Map<String, Object>> dataList)
271
             throws SQLException {
275
             throws SQLException {
272
 
276
 
273
         if (dataList == null || dataList.isEmpty()) {
277
         if (dataList == null || dataList.isEmpty()) {
274
-            log.debug("insertBatch 收到空数据,直接返回 | dbName={}, table={}", dbName, table);
278
+            log.debug("insertBatch 收到空数据,直接返回 | dbName={}, table={}", dbName, controllerId);
275
             return;
279
             return;
276
         }
280
         }
277
 
281
 
278
         Map<String, String> columnTypes = collectColumnTypes(dataList);
282
         Map<String, String> columnTypes = collectColumnTypes(dataList);
279
 
283
 
280
         // 所有数据都走拆分逻辑:按首字符 UTF-8 值模 10 分组到不同的超级表
284
         // 所有数据都走拆分逻辑:按首字符 UTF-8 值模 10 分组到不同的超级表
281
-        splitAndInsertToMultipleStables(dbName, superTable, table, dataList, columnTypes);
285
+        splitAndInsertToMultipleStables(dbName, superTable, controllerId, dataList, columnTypes);
282
 
286
 
283
-        log.info("批量写入完成: {} | 条数: {}", table, dataList.size());
287
+        log.info("批量写入完成: {} | 条数: {}", controllerId, dataList.size());
284
     }
288
     }
285
 
289
 
286
     /**
290
     /**
287
      * 根据列 key 首字符的 UTF-8 值拆分到多个超级表
291
      * 根据列 key 首字符的 UTF-8 值拆分到多个超级表
288
      */
292
      */
289
-    private void splitAndInsertToMultipleStables(String dbName, String stableName, String tableName,
293
+    private void splitAndInsertToMultipleStables(String dbName, String stableName, String controllerId,
290
                                                  List<Map<String, Object>> dataList,
294
                                                  List<Map<String, Object>> dataList,
291
                                                  Map<String, String> columnTypes) throws SQLException {
295
                                                  Map<String, String> columnTypes) throws SQLException {
292
 
296
 
304
             int groupId = group.getKey();
308
             int groupId = group.getKey();
305
             // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
309
             // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
306
             String newStableName = stableName+"_"+groupId;
310
             String newStableName = stableName+"_"+groupId;
307
-            String newTableName = stableName+"_"+groupId+"_"+tableName;
311
+            String newTableName = stableName+"_"+groupId+"_"+controllerId;
308
 
312
 
309
             // 筛选出该组列的数据
313
             // 筛选出该组列的数据
310
             List<Map<String, Object>> filteredData = filterDataByColumnGroup(dataList, group.getValue().keySet());
314
             List<Map<String, Object>> filteredData = filterDataByColumnGroup(dataList, group.getValue().keySet());
313
             int batchSize = DEFAULT_BATCH_SIZE;
317
             int batchSize = DEFAULT_BATCH_SIZE;
314
             for (int i = 0; i < filteredData.size(); i += batchSize) {
318
             for (int i = 0; i < filteredData.size(); i += batchSize) {
315
                 List<Map<String, Object>> batch = filteredData.subList(i, Math.min(i + batchSize, filteredData.size()));
319
                 List<Map<String, Object>> batch = filteredData.subList(i, Math.min(i + batchSize, filteredData.size()));
316
-                insertBatchInternal(dbName, newStableName, newTableName, batch);
320
+                insertBatchInternal(dbName, newStableName, newTableName, controllerId, batch);
317
             }
321
             }
318
             log.info("分组插入完成: stable={}, table={}, 列数={}, 数据条数={}",
322
             log.info("分组插入完成: stable={}, table={}, 列数={}, 数据条数={}",
319
                     newStableName, newTableName, group.getValue().size(), filteredData.size());
323
                     newStableName, newTableName, group.getValue().size(), filteredData.size());
374
     }
378
     }
375
 
379
 
376
     /**
380
     /**
381
+     * 收集数据中所有动态列的最大字符串长度(用于动态 VARCHAR 长度)
382
+     */
383
+    private Map<String, Integer> collectColumnMaxLengths(List<Map<String, Object>> dataList) {
384
+        Map<String, Integer> maxLengths = new HashMap<>();
385
+        for (Map<String, Object> data : dataList) {
386
+            if (data == null) {
387
+                continue;
388
+            }
389
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
390
+                String key = entry.getKey();
391
+                if (!isValidFieldName(key) || isReservedColumn(key)) {
392
+                    continue;
393
+                }
394
+                Object value = entry.getValue();
395
+                if (value != null) {
396
+                    int len = value.toString().length();
397
+                    maxLengths.merge(key, len, Math::max);
398
+                }
399
+            }
400
+        }
401
+        log.debug("收集到的列最大长度: {}", maxLengths);
402
+        return maxLengths;
403
+    }
404
+
405
+    /**
377
      * 构建批量插入 SQL
406
      * 构建批量插入 SQL
378
      */
407
      */
379
     private String buildInsertSql(String dbName, String table, String superTableName,
408
     private String buildInsertSql(String dbName, String table, String superTableName,
380
-                                   Map<String, String> columnTypes, List<Map<String, Object>> dataList) {
381
-        return buildInsertSql(dbName, table, superTableName, columnTypes, dataList, null);
409
+                                   Map<String, String> columnTypes, List<Map<String, Object>> dataList, String controllerId) {
410
+        return buildInsertSql(dbName, table, superTableName, columnTypes, dataList, null, controllerId);
382
     }
411
     }
383
 
412
 
384
     /**
413
     /**
386
      */
415
      */
387
     private String buildInsertSql(String dbName, String table, String superTableName,
416
     private String buildInsertSql(String dbName, String table, String superTableName,
388
                                    Map<String, String> columnTypes, List<Map<String, Object>> dataList,
417
                                    Map<String, String> columnTypes, List<Map<String, Object>> dataList,
389
-                                   Long customTs) {
418
+                                   Long customTs, String controllerId) {
390
         if (columnTypes.isEmpty()) {
419
         if (columnTypes.isEmpty()) {
391
             return null;
420
             return null;
392
         }
421
         }
399
 
428
 
400
         StringBuilder sql = new StringBuilder();
429
         StringBuilder sql = new StringBuilder();
401
         sql.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
430
         sql.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
402
-           .append(" (ts, surfacename");
431
+           .append(" (ts, controller_id");
403
 
432
 
404
         for (String col : columnTypes.keySet()) {
433
         for (String col : columnTypes.keySet()) {
405
             sql.append(", ").append(wrapName(col));
434
             sql.append(", ").append(wrapName(col));
412
                 continue;
441
                 continue;
413
             }
442
             }
414
 
443
 
415
-            sql.append("(").append(tsValue).append(", '").append(escapeValue(superTableName)).append("'");
444
+            sql.append("(").append(tsValue).append(", '").append(escapeValue(controllerId)).append("'");
416
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
445
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
417
                 String col = entry.getKey();
446
                 String col = entry.getKey();
418
                 Object value = data.get(col);
447
                 Object value = data.get(col);
435
     /**
464
     /**
436
      * 内部方法:插入一批数据(支持一次重试)
465
      * 内部方法:插入一批数据(支持一次重试)
437
      */
466
      */
438
-    private void insertBatchInternal(String dbName, String superTableName, String table,
467
+    private void insertBatchInternal(String dbName, String superTableName, String table,String controllerId,
439
                                      List<Map<String, Object>> dataList) throws SQLException {
468
                                      List<Map<String, Object>> dataList) throws SQLException {
440
-        insertBatchInternal(dbName, superTableName, table, dataList, false);
469
+        insertBatchInternal(dbName, superTableName, table, controllerId,dataList,false);
441
     }
470
     }
442
 
471
 
443
-    private void insertBatchInternal(String dbName, String superTableName, String table,
472
+    private void insertBatchInternal(String dbName, String superTableName, String table,String controllerId,
444
                                      List<Map<String, Object>> dataList, boolean isRetry) throws SQLException {
473
                                      List<Map<String, Object>> dataList, boolean isRetry) throws SQLException {
445
 
474
 
446
-        ensureTableExists(dbName, superTableName, table);
475
+        ensureTableExists(dbName, superTableName, table,controllerId);
447
 
476
 
448
         Map<String, String> columnTypes = collectColumnTypes(dataList);
477
         Map<String, String> columnTypes = collectColumnTypes(dataList);
449
         if (columnTypes.isEmpty()) {
478
         if (columnTypes.isEmpty()) {
451
             return;
480
             return;
452
         }
481
         }
453
 
482
 
483
+        Map<String, Integer> columnMaxLengths = collectColumnMaxLengths(dataList);
484
+
454
         log.info("收集到的列类型: {}", columnTypes);
485
         log.info("收集到的列类型: {}", columnTypes);
455
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
486
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
456
         log.info("超级表已有列: {}", existingColumns);
487
         log.info("超级表已有列: {}", existingColumns);
468
             return;
499
             return;
469
         }
500
         }
470
 
501
 
471
-        ensureColumnsExist(dbName, superTableName, columnTypes);
502
+        ensureColumnsExist(dbName, superTableName, columnTypes, columnMaxLengths);
472
 
503
 
473
         existingColumns = getStableColumns(dbName, superTableName);
504
         existingColumns = getStableColumns(dbName, superTableName);
474
         if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
505
         if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
477
             return;
508
             return;
478
         }
509
         }
479
 
510
 
480
-        String sql = buildInsertSql(dbName, table, superTableName, columnTypes, dataList);
511
+        String sql = buildInsertSql(dbName, table, superTableName, columnTypes, dataList, controllerId);
481
         if (sql == null) {
512
         if (sql == null) {
482
             log.warn("insertBatchInternal SQL 构建为空,跳过插入 | dbName={}, table={}", dbName, table);
513
             log.warn("insertBatchInternal SQL 构建为空,跳过插入 | dbName={}, table={}", dbName, table);
483
             return;
514
             return;
503
                         subColumnTypes.put(col, columnTypes.get(col));
534
                         subColumnTypes.put(col, columnTypes.get(col));
504
                     }
535
                     }
505
 
536
 
506
-                    String subSql = buildInsertSql(dbName, table, superTableName, subColumnTypes, dataList, unifiedTs);
537
+                    String subSql = buildInsertSql(dbName, table, superTableName, subColumnTypes, dataList, unifiedTs, controllerId);
507
                     if (subSql != null) {
538
                     if (subSql != null) {
508
                         stmt.executeUpdate(subSql);
539
                         stmt.executeUpdate(subSql);
509
                         insertedCount += dataList.size();
540
                         insertedCount += dataList.size();
517
             if (!isRetry && e.getMessage().contains("Table does not exist")) {
548
             if (!isRetry && e.getMessage().contains("Table does not exist")) {
518
                 log.warn("表不存在,重建并重试: {}", table);
549
                 log.warn("表不存在,重建并重试: {}", table);
519
                 clearStableColumnCache();
550
                 clearStableColumnCache();
520
-                initTableStructure(dbName, superTableName, table);
521
-                insertBatchInternal(dbName, superTableName, table, dataList, true);
551
+                initTableStructure(dbName, superTableName, table, controllerId);
552
+                insertBatchInternal(dbName, superTableName, table, controllerId, dataList, true);
522
                 return;
553
                 return;
523
             }
554
             }
524
             throw new SQLException("批量写入 SQL 失败: " + table + " | 错误: " + e.getMessage(), e);
555
             throw new SQLException("批量写入 SQL 失败: " + table + " | 错误: " + e.getMessage(), e);
526
     }
557
     }
527
 
558
 
528
     /**
559
     /**
529
-     * 判断是否为保留列(ts, surfacename
560
+     * 判断是否为保留列(ts)
530
      */
561
      */
531
     private boolean isReservedColumn(String columnName) {
562
     private boolean isReservedColumn(String columnName) {
532
-        return "ts".equalsIgnoreCase(columnName) || "surfacename".equalsIgnoreCase(columnName);
563
+        return "ts".equalsIgnoreCase(columnName) || "controller_id".equalsIgnoreCase(columnName);
533
     }
564
     }
534
 
565
 
535
     /**
566
     /**
545
         if (value instanceof Integer || value instanceof Long) {
576
         if (value instanceof Integer || value instanceof Long) {
546
             return "BIGINT";
577
             return "BIGINT";
547
         }
578
         }
548
-        if (value instanceof Float || value instanceof Double) {
579
+        if (value instanceof Float || value instanceof Double || value instanceof BigDecimal) {
549
             return "DOUBLE";
580
             return "DOUBLE";
550
         }
581
         }
551
         // 时间类型(Date, Timestamp, LocalDateTime 等)
582
         // 时间类型(Date, Timestamp, LocalDateTime 等)
559
     /**
590
     /**
560
      * 根据 TdEngine 列类型获取创建列的 SQL 类型
591
      * 根据 TdEngine 列类型获取创建列的 SQL 类型
561
      */
592
      */
562
-    private String getColumnTypeForDDL(String tdType) {
593
+    private String getColumnTypeForDDL(String tdType, Integer maxLen) {
563
         switch (tdType) {
594
         switch (tdType) {
564
             case "BOOL":
595
             case "BOOL":
565
             case "BIGINT":
596
             case "BIGINT":
568
                 return tdType;
599
                 return tdType;
569
             case "VARCHAR":
600
             case "VARCHAR":
570
             default:
601
             default:
571
-                return "VARCHAR(" + DEFAULT_VARCHAR_LENGTH + ")";
602
+                int len = (maxLen != null && maxLen > 0) ? maxLen + 5 : DEFAULT_VARCHAR_LENGTH;
603
+                return "VARCHAR(" + len + ")";
572
         }
604
         }
573
     }
605
     }
574
 
606
 
612
             }
644
             }
613
         }
645
         }
614
 
646
 
615
-        // 字符串类型校验长度
616
-        if (strValue.length() > DEFAULT_VARCHAR_LENGTH) {
617
-            log.debug("字段值超长,截断存储 | 列: {} | 值长度: {} | 最大: {} | 截断后: {}...",
618
-                    columnName, strValue.length(), DEFAULT_VARCHAR_LENGTH, strValue.substring(0, DEFAULT_VARCHAR_LENGTH));
619
-            strValue = strValue.substring(0, DEFAULT_VARCHAR_LENGTH);
620
-        }
647
+        // 字符串类型:使用实际值长度 + 5 冗余,不截断
648
+        // (TDengine VARCHAR 实际限制在创建列时已保证充足)
621
         return "'" + escapeValue(strValue) + "'";
649
         return "'" + escapeValue(strValue) + "'";
622
     }
650
     }
623
 
651
 
624
     /**
652
     /**
625
      * 确保列存在,如有新列则 ALTER 添加(根据类型添加对应列)
653
      * 确保列存在,如有新列则 ALTER 添加(根据类型添加对应列)
626
      */
654
      */
627
-    private void ensureColumnsExist(String dbName, String superTableName, Map<String, String> columnTypes)
628
-            throws SQLException {
655
+    private void ensureColumnsExist(String dbName, String superTableName, Map<String, String> columnTypes,
656
+                                     Map<String, Integer> columnMaxLengths) throws SQLException {
629
 
657
 
630
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
658
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
631
 
659
 
641
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
669
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
642
                 String col = entry.getKey();
670
                 String col = entry.getKey();
643
                 String colType = entry.getValue();
671
                 String colType = entry.getValue();
672
+                Integer maxLen = columnMaxLengths != null ? columnMaxLengths.get(col) : null;
644
 
673
 
645
                 // 再次检查列数上限
674
                 // 再次检查列数上限
646
                 if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
675
                 if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
650
                 }
679
                 }
651
 
680
 
652
                 if (!existingColumns.contains(col)) {
681
                 if (!existingColumns.contains(col)) {
653
-                    String ddlType = getColumnTypeForDDL(colType);
682
+                    String ddlType = getColumnTypeForDDL(colType, maxLen);
654
                     String alterSql = String.format(
683
                     String alterSql = String.format(
655
                             "ALTER STABLE %s.%s ADD COLUMN %s %s",
684
                             "ALTER STABLE %s.%s ADD COLUMN %s %s",
656
                             wrapName(dbName),
685
                             wrapName(dbName),
684
     /**
713
     /**
685
      * 确保表存在
714
      * 确保表存在
686
      */
715
      */
687
-    private void ensureTableExists(String dbName, String superTableName, String table)
716
+    private void ensureTableExists(String dbName, String superTableName, String table,String controllerId)
688
             throws SQLException {
717
             throws SQLException {
689
 
718
 
690
         if (!isValidTableName(dbName) || !isValidTableName(superTableName) || !isValidTableName(table)) {
719
         if (!isValidTableName(dbName) || !isValidTableName(superTableName) || !isValidTableName(table)) {
702
                 try (ResultSet rs = pStmt.executeQuery()) {
731
                 try (ResultSet rs = pStmt.executeQuery()) {
703
                     if (!rs.next()) {
732
                     if (!rs.next()) {
704
                         log.info("超级表不存在,创建: {}.{}", dbName, superTableName);
733
                         log.info("超级表不存在,创建: {}.{}", dbName, superTableName);
705
-                        initTableStructure(dbName, superTableName, table);
734
+                        initTableStructure(dbName, superTableName, table, controllerId);
706
                         return;
735
                         return;
707
                     }
736
                     }
708
                 }
737
                 }
747
         log.info("清除了 TdEngine 超级表结构缓存");
776
         log.info("清除了 TdEngine 超级表结构缓存");
748
     }
777
     }
749
 
778
 
779
+    /**
780
+     * 根据 controllerId、deviceId 查询 TDengine 最新数据
781
+     * @param controllerId 控制器ID
782
+     * @param deviceId 设备ID
783
+     * @param key 列名,不传则返回所有列
784
+     */
785
+    public Object queryLatestData(String controllerId, String deviceId, String key) throws SQLException {
786
+        // 构建 dbName 前缀
787
+        String dbNamePrefix = "pe_iot_" + controllerId.substring(0, 2);
788
+
789
+        // 遍历 10 个分组查找数据
790
+        for (int groupId = 0; groupId < 10; groupId++) {
791
+            // 表名格式: {deviceId}_{groupId}_{controllerId} (与 insertBatch 保持一致)
792
+            String tableName = deviceId + "_" + groupId + "_" + controllerId;
793
+
794
+            // 检查表是否存在
795
+            if (!isTableExists(dbNamePrefix, tableName)) {
796
+                continue;
797
+            }
798
+
799
+            // 构建查询 SQL
800
+            String colPart = (key != null && !key.trim().isEmpty()) ? wrapName(key) : "*";
801
+            String sql = String.format("SELECT %s FROM %s.%s ORDER BY ts DESC LIMIT 1",
802
+                    colPart, wrapName(dbNamePrefix), wrapName(tableName));
803
+
804
+            try (Connection conn = getConnection();
805
+                 Statement stmt = conn.createStatement();
806
+                 ResultSet rs = stmt.executeQuery(sql)) {
807
+
808
+                if (rs.next()) {
809
+                    if (key != null && !key.trim().isEmpty()) {
810
+                        // 返回指定列的值
811
+                        return rs.getObject(1);
812
+                    } else {
813
+                        // 返回整行数据
814
+                        java.util.Map<String, Object> row = new java.util.LinkedHashMap<>();
815
+                        ResultSetMetaData metaData = rs.getMetaData();
816
+                        int columnCount = metaData.getColumnCount();
817
+                        for (int i = 1; i <= columnCount; i++) {
818
+                            row.put(metaData.getColumnLabel(i), rs.getObject(i));
819
+                        }
820
+                        return row;
821
+                    }
822
+                }
823
+            } catch (SQLException e) {
824
+                // 列不存在,继续查找下一个分组
825
+                if (e.getMessage().contains("Invalid column")) {
826
+                    log.debug("表 {} 中无列 {},继续查找其他分组", tableName, key);
827
+                    continue;
828
+                }
829
+                throw e;
830
+            }
831
+        }
832
+        return null;
833
+    }
834
+
835
+    /**
836
+     * 检查表是否存在
837
+     */
838
+    private boolean isTableExists(String dbName, String tableName) throws SQLException {
839
+        // 先检查数据库是否存在
840
+        String checkDbSql = "SHOW DATABASES";
841
+        try (Connection conn = getConnection();
842
+             Statement stmt = conn.createStatement()) {
843
+            stmt.setQueryTimeout(5);
844
+            boolean dbExists = false;
845
+            try (ResultSet rs = stmt.executeQuery(checkDbSql)) {
846
+                while (rs.next()) {
847
+                    if (dbName.equals(rs.getString(1))) {
848
+                        dbExists = true;
849
+                        break;
850
+                    }
851
+                }
852
+            }
853
+            if (!dbExists) {
854
+                return false;
855
+            }
856
+            // 检查表是否存在
857
+            String sql = String.format("SHOW %s.TABLES", dbName);
858
+            try (ResultSet rs = stmt.executeQuery(sql)) {
859
+                while (rs.next()) {
860
+                    if (tableName.equals(rs.getString(1))) {
861
+                        return true;
862
+                    }
863
+                }
864
+            }
865
+            return false;
866
+        }
867
+    }
868
+
750
     public void close() {
869
     public void close() {
751
         log.info("关闭 TdEngine 服务...");
870
         log.info("关闭 TdEngine 服务...");
752
         if (dataSource != null) {
871
         if (dataSource != null) {

+ 135
- 0
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Просмотреть файл

1
+package com.iot.platform.task;
2
+
3
+import com.iot.platform.domain.SysCar;
4
+import com.iot.platform.domain.SysDevice;
5
+import com.iot.platform.domain.SysDeviceControl;
6
+import com.iot.platform.config.IotProperties;
7
+import com.iot.platform.service.*;
8
+import org.slf4j.Logger;
9
+import org.slf4j.LoggerFactory;
10
+import org.springframework.dao.DataAccessException;
11
+import org.springframework.data.redis.RedisConnectionFailureException;
12
+import org.springframework.data.redis.core.RedisCallback;
13
+import org.springframework.data.redis.core.ScanOptions;
14
+import org.springframework.data.redis.core.StringRedisTemplate;
15
+import org.springframework.scheduling.annotation.Scheduled;
16
+import org.springframework.stereotype.Component;
17
+import org.springframework.web.client.RestClientException;
18
+import org.springframework.web.client.RestTemplate;
19
+
20
+import java.util.*;
21
+import java.util.concurrent.TimeUnit;
22
+
23
+@Component
24
+public class VehicleSyncTask {
25
+
26
+    private static final Logger log = LoggerFactory.getLogger(VehicleSyncTask.class);
27
+
28
+    private final StringRedisTemplate stringRedisTemplate;
29
+    private final SysRealtimeService sysrealtimeService;
30
+
31
+    public VehicleSyncTask(StringRedisTemplate stringRedisTemplate,
32
+                           SysRealtimeService sysrealtimeService) {
33
+
34
+        this.stringRedisTemplate = stringRedisTemplate;
35
+        this.sysrealtimeService = sysrealtimeService;
36
+    }
37
+
38
+    private boolean tryLock(String lockKey, long expireSeconds) {
39
+        Boolean acquired = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", expireSeconds, TimeUnit.SECONDS);
40
+        return Boolean.TRUE.equals(acquired);
41
+    }
42
+
43
+    private void unlock(String lockKey) {
44
+        Boolean deleted = stringRedisTemplate.delete(lockKey);
45
+        if (!Boolean.TRUE.equals(deleted)) {
46
+            log.warn("分布式锁释放失败: {}", lockKey);
47
+        }
48
+    }
49
+
50
+    /**
51
+     * 更新数据库实时数据
52
+     */
53
+    @Scheduled(fixedDelay = 30000)
54
+    public void syncRedisToMySQL() {
55
+        String lockKey = "lock:vehicle-sync:syncRedisToMySQL";
56
+        if (!tryLock(lockKey, 60)) {
57
+            log.debug("获取锁失败,跳过本次执行: {}", lockKey);
58
+            return;
59
+        }
60
+        try {
61
+            doSyncRedisToMySQL();
62
+        } finally {
63
+            unlock(lockKey);
64
+        }
65
+    }
66
+
67
+    private void doSyncRedisToMySQL() {
68
+        Set<String> activeKeys = stringRedisTemplate.opsForSet().members("iot:dsb:active:devices");
69
+        if (activeKeys == null || activeKeys.isEmpty()) return;
70
+
71
+        for (String redisKey : activeKeys) {
72
+            try {
73
+                Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
74
+                if (dataMap == null || dataMap.isEmpty()) {
75
+                    stringRedisTemplate.opsForSet().remove("iot:dsb:active:devices", redisKey);
76
+                    continue;
77
+                }
78
+
79
+                // redisKey 格式: pe_iot_<controllerId>_<deviceId>
80
+                // 从右往左取,避免 id 本身含下划线的问题
81
+                String[] parts = redisKey.split("_");
82
+                if (parts.length < 3) {
83
+                    log.warn("跳过非法 redis key: {}", redisKey);
84
+                    continue;
85
+                }
86
+                String controllerId = parts[parts.length - 2];
87
+                String deviceId = parts[parts.length - 1];
88
+                String tableName = "pe_iot_" + controllerId.substring(0, 2);
89
+
90
+                try {
91
+                    sysrealtimeService.createRealtime(tableName);
92
+                } catch (Exception e) {
93
+                    log.error("创建表失败: {} | {}", tableName, e.getMessage(), e);
94
+                    continue;
95
+                }
96
+
97
+                String createTime = getStringValue(dataMap, "createTime");
98
+                String timestamp = getStringValue(dataMap, "timestamp");
99
+
100
+                if (createTime == null || timestamp == null) {
101
+                    continue;
102
+                }
103
+
104
+                List<String> existingKeys = sysrealtimeService.selectAllKeys(tableName, controllerId);
105
+                Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
106
+
107
+                for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
108
+                    String fieldKey = entry.getKey().toString();
109
+                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey) || "controller_id".equals(fieldKey)) {
110
+                        continue;
111
+                    }
112
+                    String fieldValue = getStringValue(dataMap, fieldKey);
113
+                    if (fieldValue == null) continue;
114
+
115
+                    if (existingKeySet.contains(fieldKey)) {
116
+                        sysrealtimeService.updateTables(tableName, createTime, fieldValue, timestamp, fieldKey, controllerId, deviceId);
117
+                    } else {
118
+                        sysrealtimeService.insertTables(tableName, createTime, controllerId, deviceId, timestamp, fieldKey, fieldValue);
119
+                    }
120
+                }
121
+            } catch (RedisConnectionFailureException e) {
122
+                log.error("Redis 连接失败: {} | {}", redisKey, e.getMessage());
123
+            } catch (DataAccessException e) {
124
+                log.error("数据库操作失败: {} | {}", redisKey, e.getMessage(), e);
125
+            } catch (Exception e) {
126
+                log.error("同步设备失败: {} | {}", redisKey, e.getMessage(), e);
127
+            }
128
+        }
129
+    }
130
+
131
+    private String getStringValue(Map<Object, Object> map, String key) {
132
+        Object val = map.get(key);
133
+        return val == null ? null : val.toString().trim();
134
+    }
135
+}

+ 8
- 4
iot-platform/src/main/resources/mapper/SysRealtimeMapper.xml Просмотреть файл

14
     <update id="createRealtime">
14
     <update id="createRealtime">
15
         CREATE TABLE IF NOT EXISTS `${tableName}` (
15
         CREATE TABLE IF NOT EXISTS `${tableName}` (
16
             create_time VARCHAR(255) NOT NULL COMMENT '时间戳',
16
             create_time VARCHAR(255) NOT NULL COMMENT '时间戳',
17
+            controller_id VARCHAR(255) NOT NULL COMMENT '控制器id',
17
             device_id VARCHAR(255) NOT NULL COMMENT '设备id',
18
             device_id VARCHAR(255) NOT NULL COMMENT '设备id',
18
             timestamp VARCHAR(255) NOT NULL COMMENT '时间戳',
19
             timestamp VARCHAR(255) NOT NULL COMMENT '时间戳',
19
             k VARCHAR(255) NOT NULL COMMENT 'key',
20
             k VARCHAR(255) NOT NULL COMMENT 'key',
20
             v VARCHAR(255) NOT NULL COMMENT '值',
21
             v VARCHAR(255) NOT NULL COMMENT '值',
22
+            INDEX idx_controller_id (controller_id),
23
+            INDEX idx_controller_device (controller_id, device_id),
24
+            INDEX idx_controller_k (controller_id, k),
21
             INDEX idx_device_id (device_id),
25
             INDEX idx_device_id (device_id),
22
             INDEX idx_device_create_time (device_id, create_time),
26
             INDEX idx_device_create_time (device_id, create_time),
23
             INDEX idx_k (k)
27
             INDEX idx_k (k)
28
         SHOW TABLES;
32
         SHOW TABLES;
29
     </select>
33
     </select>
30
     <insert id="insertTables">
34
     <insert id="insertTables">
31
-        insert into `${tableName}`(create_time,device_id,timestamp,k,v) values(#{createTime},#{deviceId},#{timestamp},#{k},#{v})
35
+        insert into `${tableName}`(create_time,controller_id,device_id,timestamp,k,v) values(#{createTime},#{controllerId},#{deviceId},#{timestamp},#{k},#{v})
32
     </insert>
36
     </insert>
33
     <update id="updateTables">
37
     <update id="updateTables">
34
-        update `${tableName}` set create_time=#{createTime},v=#{v},timestamp=#{timestamp} where k=#{k} and device_id=#{deviceId}
38
+        update `${tableName}` set create_time=#{createTime},v=#{v},timestamp=#{timestamp} where k=#{k} and device_id=#{deviceId} and controller_id=#{controllerId}
35
     </update>
39
     </update>
36
 
40
 
37
     <select id="selectKey" resultType="Integer">
41
     <select id="selectKey" resultType="Integer">
38
-        select COUNT(*) from `${tableName}` where k=#{k}
42
+        select COUNT(*) from `${tableName}` where k=#{k} and controller_id=#{controllerId}
39
     </select>
43
     </select>
40
 
44
 
41
     <select id="selectAllKeys" resultType="String">
45
     <select id="selectAllKeys" resultType="String">
42
-        SELECT k FROM `${tableName}`
46
+        SELECT k FROM `${tableName}` WHERE controller_id=#{controllerId}
43
     </select>
47
     </select>
44
 </mapper>
48
 </mapper>

Загрузка…
Отмена
Сохранить