Преглед на файлове

Merge branch 'mqy20260511' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511

 Conflicts:
	iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java
	iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java
mqy20260511
lenovo преди 2 седмици
родител
ревизия
10fedc0a39

+ 1
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Целия файл

@@ -79,6 +79,6 @@ public class MqttChargeStationConsumer extends AbstractDynamicMqttConsumer {
79 79
 //        superTable + "_" +
80 80
 //        int i = ((int) "g".getBytes("UTF-8")[0]) % 10;
81 81
 
82
-        tdengineService.insertBatch(dbNamePrefix, superTable,tableName, batchToInsert);
82
+        tdengineService.insertBatch(dbNamePrefix, superTable, tableName, batchToInsert);
83 83
     }
84 84
 }

+ 2
- 2
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Целия файл

@@ -107,8 +107,8 @@ public class MqttDynamicConsumer extends AbstractDynamicMqttConsumer {
107 107
         List<Map<String, Object>> batch = Collections.singletonList(data);
108 108
 
109 109
         String dbNamePrefix = "pe_iot_"+ctx.dbName.substring(0, 2);
110
-        String tableName = ctx.dbName;
111
-        tdengineService.insertBatch(dbNamePrefix, ctx.superTable,tableName, batch);
110
+        String controllerId = ctx.dbName;
111
+        tdengineService.insertBatch(dbNamePrefix, ctx.superTable, controllerId, batch);
112 112
     }
113 113
 
114 114
     private void writeToRedis(MessageContext ctx) {

+ 32
- 31
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Целия файл

@@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
8 8
 import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.stereotype.Service;
10 10
 
11
+import java.math.BigDecimal;
11 12
 import java.sql.*;
12 13
 import java.time.ZoneOffset;
13 14
 import java.util.*;
@@ -230,7 +231,7 @@ public class TdEngineService {
230 231
     // ==========================================
231 232
     // 初始化表结构(按列存储,无 ext_data)
232 233
     // ==========================================
233
-    private void initTableStructure(String dbName, String superTableName, String table)
234
+    private void initTableStructure(String dbName, String superTableName, String table, String controllerId)
234 235
             throws SQLException {
235 236
 
236 237
         try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
@@ -238,9 +239,9 @@ public class TdEngineService {
238 239
 
239 240
             stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName));
240 241
 
241
-            // 创建超级表:固定 ts + surfacename,无 ext_data 列
242
+            // 创建超级表:固定 ts + controller_id 列(TDengine 要求至少 2 列)
242 243
             String stableSql = String.format(
243
-                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location VARCHAR(255))",
244
+                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, controller_id BIGINT) TAGS (location VARCHAR(64))",
244 245
                     wrapName(dbName),
245 246
                     wrapName(superTableName)
246 247
             );
@@ -250,17 +251,17 @@ public class TdEngineService {
250 251
             String key = getStableKey(dbName, superTableName);
251 252
             Set<String> fixedCols = new HashSet<>();
252 253
             fixedCols.add("ts");
253
-            fixedCols.add("surfacename");
254
+            fixedCols.add("controller_id");
254 255
             stableColumnCache.put(key, fixedCols);
255 256
 
256
-            // 创建子表
257
+            // 创建子表,使用 controllerId 作为 tag
257 258
             String tableSql = String.format(
258 259
                     "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
259 260
                     wrapName(dbName),
260 261
                     wrapName(table),
261 262
                     wrapName(dbName),
262 263
                     wrapName(superTableName),
263
-                    escapeValue(superTableName)
264
+                    escapeValue(controllerId)
264 265
             );
265 266
             stmt.executeUpdate(tableSql);
266 267
         }
@@ -269,26 +270,26 @@ public class TdEngineService {
269 270
     // ==========================================
270 271
     // 批量插入(按列存储)
271 272
     // ==========================================
272
-    public void insertBatch(String dbName, String superTable, String table, List<Map<String, Object>> dataList)
273
+    public void insertBatch(String dbName, String superTable, String controllerId, List<Map<String, Object>> dataList)
273 274
             throws SQLException {
274 275
 
275 276
         if (dataList == null || dataList.isEmpty()) {
276
-            log.debug("insertBatch 收到空数据,直接返回 | dbName={}, table={}", dbName, table);
277
+            log.debug("insertBatch 收到空数据,直接返回 | dbName={}, table={}", dbName, controllerId);
277 278
             return;
278 279
         }
279 280
 
280 281
         Map<String, String> columnTypes = collectColumnTypes(dataList);
281 282
 
282 283
         // 所有数据都走拆分逻辑:按首字符 UTF-8 值模 10 分组到不同的超级表
283
-        splitAndInsertToMultipleStables(dbName, superTable, table, dataList, columnTypes);
284
+        splitAndInsertToMultipleStables(dbName, superTable, controllerId, dataList, columnTypes);
284 285
 
285
-        log.info("批量写入完成: {} | 条数: {}", table, dataList.size());
286
+        log.info("批量写入完成: {} | 条数: {}", controllerId, dataList.size());
286 287
     }
287 288
 
288 289
     /**
289 290
      * 根据列 key 首字符的 UTF-8 值拆分到多个超级表
290 291
      */
291
-    private void splitAndInsertToMultipleStables(String dbName, String stableName, String tableName,
292
+    private void splitAndInsertToMultipleStables(String dbName, String stableName, String controllerId,
292 293
                                                  List<Map<String, Object>> dataList,
293 294
                                                  Map<String, String> columnTypes) throws SQLException {
294 295
         // 按首字符 UTF-8 值分组
@@ -305,7 +306,7 @@ public class TdEngineService {
305 306
             int groupId = group.getKey();
306 307
             // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
307 308
             String newStableName = stableName+"_"+groupId;
308
-            String newTableName = stableName+"_"+groupId+"_"+tableName;
309
+            String newTableName = stableName+"_"+groupId+"_"+controllerId;
309 310
 
310 311
             // 筛选出该组列的数据
311 312
             List<Map<String, Object>> filteredData = filterDataByColumnGroup(dataList, group.getValue().keySet());
@@ -314,7 +315,7 @@ public class TdEngineService {
314 315
             int batchSize = DEFAULT_BATCH_SIZE;
315 316
             for (int i = 0; i < filteredData.size(); i += batchSize) {
316 317
                 List<Map<String, Object>> batch = filteredData.subList(i, Math.min(i + batchSize, filteredData.size()));
317
-                insertBatchInternal(dbName, newStableName, newTableName, batch);
318
+                insertBatchInternal(dbName, newStableName, newTableName, controllerId, batch);
318 319
             }
319 320
             log.info("分组插入完成: stable={}, table={}, 列数={}, 数据条数={}",
320 321
                     newStableName, newTableName, group.getValue().size(), filteredData.size());
@@ -381,8 +382,8 @@ public class TdEngineService {
381 382
      * 构建批量插入 SQL
382 383
      */
383 384
     private String buildInsertSql(String dbName, String table, String superTableName,
384
-                                   Map<String, String> columnTypes, List<Map<String, Object>> dataList) {
385
-        return buildInsertSql(dbName, table, superTableName, columnTypes, dataList, null);
385
+                                   Map<String, String> columnTypes, List<Map<String, Object>> dataList, String controllerId) {
386
+        return buildInsertSql(dbName, table, superTableName, columnTypes, dataList, null, controllerId);
386 387
     }
387 388
 
388 389
     /**
@@ -390,7 +391,7 @@ public class TdEngineService {
390 391
      */
391 392
     private String buildInsertSql(String dbName, String table, String superTableName,
392 393
                                    Map<String, String> columnTypes, List<Map<String, Object>> dataList,
393
-                                   Long customTs) {
394
+                                   Long customTs, String controllerId) {
394 395
         if (columnTypes.isEmpty()) {
395 396
             return null;
396 397
         }
@@ -403,7 +404,7 @@ public class TdEngineService {
403 404
 
404 405
         StringBuilder sql = new StringBuilder();
405 406
         sql.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
406
-           .append(" (ts, surfacename");
407
+           .append(" (ts, controller_id");
407 408
 
408 409
         for (String col : columnTypes.keySet()) {
409 410
             sql.append(", ").append(wrapName(col));
@@ -416,7 +417,7 @@ public class TdEngineService {
416 417
                 continue;
417 418
             }
418 419
 
419
-            sql.append("(").append(tsValue).append(", '").append(escapeValue(superTableName)).append("'");
420
+            sql.append("(").append(tsValue).append(", '").append(escapeValue(controllerId)).append("'");
420 421
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
421 422
                 String col = entry.getKey();
422 423
                 Object value = data.get(col);
@@ -439,15 +440,15 @@ public class TdEngineService {
439 440
     /**
440 441
      * 内部方法:插入一批数据(支持一次重试)
441 442
      */
442
-    private void insertBatchInternal(String dbName, String superTableName, String table,
443
+    private void insertBatchInternal(String dbName, String superTableName, String table,String controllerId,
443 444
                                      List<Map<String, Object>> dataList) throws SQLException {
444
-        insertBatchInternal(dbName, superTableName, table, dataList, false);
445
+        insertBatchInternal(dbName, superTableName, table, controllerId,dataList,false);
445 446
     }
446 447
 
447
-    private void insertBatchInternal(String dbName, String superTableName, String table,
448
+    private void insertBatchInternal(String dbName, String superTableName, String table,String controllerId,
448 449
                                      List<Map<String, Object>> dataList, boolean isRetry) throws SQLException {
449 450
 
450
-        ensureTableExists(dbName, superTableName, table);
451
+        ensureTableExists(dbName, superTableName, table,controllerId);
451 452
 
452 453
         Map<String, String> columnTypes = collectColumnTypes(dataList);
453 454
         if (columnTypes.isEmpty()) {
@@ -481,7 +482,7 @@ public class TdEngineService {
481 482
             return;
482 483
         }
483 484
 
484
-        String sql = buildInsertSql(dbName, table, superTableName, columnTypes, dataList);
485
+        String sql = buildInsertSql(dbName, table, superTableName, columnTypes, dataList, controllerId);
485 486
         if (sql == null) {
486 487
             log.warn("insertBatchInternal SQL 构建为空,跳过插入 | dbName={}, table={}", dbName, table);
487 488
             return;
@@ -507,7 +508,7 @@ public class TdEngineService {
507 508
                         subColumnTypes.put(col, columnTypes.get(col));
508 509
                     }
509 510
 
510
-                    String subSql = buildInsertSql(dbName, table, superTableName, subColumnTypes, dataList, unifiedTs);
511
+                    String subSql = buildInsertSql(dbName, table, superTableName, subColumnTypes, dataList, unifiedTs, controllerId);
511 512
                     if (subSql != null) {
512 513
                         stmt.executeUpdate(subSql);
513 514
                         insertedCount += dataList.size();
@@ -521,8 +522,8 @@ public class TdEngineService {
521 522
             if (!isRetry && e.getMessage().contains("Table does not exist")) {
522 523
                 log.warn("表不存在,重建并重试: {}", table);
523 524
                 clearStableColumnCache();
524
-                initTableStructure(dbName, superTableName, table);
525
-                insertBatchInternal(dbName, superTableName, table, dataList, true);
525
+                initTableStructure(dbName, superTableName, table, controllerId);
526
+                insertBatchInternal(dbName, superTableName, table, controllerId, dataList, true);
526 527
                 return;
527 528
             }
528 529
             throw new SQLException("批量写入 SQL 失败: " + table + " | 错误: " + e.getMessage(), e);
@@ -530,10 +531,10 @@ public class TdEngineService {
530 531
     }
531 532
 
532 533
     /**
533
-     * 判断是否为保留列(ts, surfacename
534
+     * 判断是否为保留列(ts)
534 535
      */
535 536
     private boolean isReservedColumn(String columnName) {
536
-        return "ts".equalsIgnoreCase(columnName) || "surfacename".equalsIgnoreCase(columnName);
537
+        return "ts".equalsIgnoreCase(columnName) || "controller_id".equalsIgnoreCase(columnName);
537 538
     }
538 539
 
539 540
     /**
@@ -549,7 +550,7 @@ public class TdEngineService {
549 550
         if (value instanceof Integer || value instanceof Long) {
550 551
             return "BIGINT";
551 552
         }
552
-        if (value instanceof Float || value instanceof Double) {
553
+        if (value instanceof Float || value instanceof Double || value instanceof BigDecimal) {
553 554
             return "DOUBLE";
554 555
         }
555 556
         // 时间类型(Date, Timestamp, LocalDateTime 等)
@@ -688,7 +689,7 @@ public class TdEngineService {
688 689
     /**
689 690
      * 确保表存在
690 691
      */
691
-    private void ensureTableExists(String dbName, String superTableName, String table)
692
+    private void ensureTableExists(String dbName, String superTableName, String table,String controllerId)
692 693
             throws SQLException {
693 694
 
694 695
         if (!isValidTableName(dbName) || !isValidTableName(superTableName) || !isValidTableName(table)) {
@@ -706,7 +707,7 @@ public class TdEngineService {
706 707
                 try (ResultSet rs = pStmt.executeQuery()) {
707 708
                     if (!rs.next()) {
708 709
                         log.info("超级表不存在,创建: {}.{}", dbName, superTableName);
709
-                        initTableStructure(dbName, superTableName, table);
710
+                        initTableStructure(dbName, superTableName, table, controllerId);
710 711
                         return;
711 712
                     }
712 713
                 }

+ 149
- 0
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Целия файл

@@ -0,0 +1,149 @@
1
+package com.iot.platform.task;
2
+
3
+import com.iot.platform.domain.SysCar;
4
+import com.iot.platform.domain.SysDevice;
5
+import com.iot.platform.domain.SysDeviceControl;
6
+import com.iot.platform.config.IotProperties;
7
+import com.iot.platform.service.*;
8
+import org.slf4j.Logger;
9
+import org.slf4j.LoggerFactory;
10
+import org.springframework.dao.DataAccessException;
11
+import org.springframework.data.redis.RedisConnectionFailureException;
12
+import org.springframework.data.redis.core.RedisCallback;
13
+import org.springframework.data.redis.core.ScanOptions;
14
+import org.springframework.data.redis.core.StringRedisTemplate;
15
+import org.springframework.scheduling.annotation.Scheduled;
16
+import org.springframework.stereotype.Component;
17
+import org.springframework.web.client.RestClientException;
18
+import org.springframework.web.client.RestTemplate;
19
+
20
+import java.util.*;
21
+import java.util.concurrent.TimeUnit;
22
+
23
+@Component
24
+public class VehicleSyncTask {
25
+
26
+    private static final Logger log = LoggerFactory.getLogger(VehicleSyncTask.class);
27
+
28
+    private final SysCarService sysCarService;
29
+    private final SysDeviceService sysDeviceService;
30
+    private final StringRedisTemplate stringRedisTemplate;
31
+    private final SysRealtimeService sysrealtimeService;
32
+
33
+    private final SysDeviceVoService sysDeviceVoService;
34
+    private final SysDeviceControlService sysDeviceControlService;
35
+    private final RestTemplate restTemplate;
36
+    private final IotProperties iotProperties;
37
+
38
+    public VehicleSyncTask(SysCarService sysCarService,
39
+                           SysDeviceService sysDeviceService,
40
+                           StringRedisTemplate stringRedisTemplate,
41
+                           SysRealtimeService sysrealtimeService,
42
+                           SysDeviceVoService sysDeviceVoService,
43
+                           SysDeviceControlService sysDeviceControlService,
44
+                           RestTemplate restTemplate,
45
+                           IotProperties iotProperties) {
46
+        this.sysCarService = sysCarService;
47
+        this.sysDeviceService = sysDeviceService;
48
+        this.stringRedisTemplate = stringRedisTemplate;
49
+        this.sysrealtimeService = sysrealtimeService;
50
+        this.sysDeviceVoService = sysDeviceVoService;
51
+        this.sysDeviceControlService = sysDeviceControlService;
52
+        this.restTemplate = restTemplate;
53
+        this.iotProperties = iotProperties;
54
+    }
55
+
56
+    private boolean tryLock(String lockKey, long expireSeconds) {
57
+        Boolean acquired = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", expireSeconds, TimeUnit.SECONDS);
58
+        return Boolean.TRUE.equals(acquired);
59
+    }
60
+
61
+    private void unlock(String lockKey) {
62
+        Boolean deleted = stringRedisTemplate.delete(lockKey);
63
+        if (!Boolean.TRUE.equals(deleted)) {
64
+            log.warn("分布式锁释放失败: {}", lockKey);
65
+        }
66
+    }
67
+
68
+    /**
69
+     * 更新数据库实时数据
70
+     */
71
+    @Scheduled(fixedDelay = 30000)
72
+    public void syncRedisToMySQL() {
73
+        String lockKey = "lock:vehicle-sync:syncRedisToMySQL";
74
+        if (!tryLock(lockKey, 60)) {
75
+            log.debug("获取锁失败,跳过本次执行: {}", lockKey);
76
+            return;
77
+        }
78
+        try {
79
+            doSyncRedisToMySQL();
80
+        } finally {
81
+            unlock(lockKey);
82
+        }
83
+    }
84
+
85
+    private void doSyncRedisToMySQL() {
86
+        Set<String> activeKeys = stringRedisTemplate.opsForSet().members("DSB:active:devices");
87
+        if (activeKeys == null || activeKeys.isEmpty()) return;
88
+
89
+        for (String redisKey : activeKeys) {
90
+            try {
91
+                Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
92
+                if (dataMap == null || dataMap.isEmpty()) {
93
+                    stringRedisTemplate.opsForSet().remove("DSB:active:devices", redisKey);
94
+                    continue;
95
+                }
96
+
97
+                String[] parts = redisKey.split(":", 3);
98
+                if (parts.length != 3 || !"DSB".equals(parts[0])) {
99
+                    log.warn("跳过非法 key: {}", redisKey);
100
+                    continue;
101
+                }
102
+                String controllerId = parts[1];
103
+
104
+                try {
105
+                    sysrealtimeService.createRealtime(controllerId);
106
+                } catch (Exception e) {
107
+                    log.error("创建表失败: {} | {}", controllerId, e.getMessage(), e);
108
+                    continue;
109
+                }
110
+
111
+                String createTime = getStringValue(dataMap, "createTime");
112
+                String timestamp = getStringValue(dataMap, "timestamp");
113
+                String deviceId = getStringValue(dataMap, "device_id");
114
+                if (createTime == null || timestamp == null || deviceId == null) {
115
+                    continue;
116
+                }
117
+
118
+                List<String> existingKeys = sysrealtimeService.selectAllKeys(controllerId);
119
+                Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
120
+
121
+                for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
122
+                    String fieldKey = entry.getKey().toString();
123
+                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey)) {
124
+                        continue;
125
+                    }
126
+                    String fieldValue = getStringValue(dataMap, fieldKey);
127
+                    if (fieldValue == null) continue;
128
+
129
+                    if (existingKeySet.contains(fieldKey)) {
130
+                        sysrealtimeService.updateTables(controllerId, createTime, fieldValue, timestamp, fieldKey, deviceId);
131
+                    } else {
132
+                        sysrealtimeService.insertTables(controllerId, createTime, deviceId, timestamp, fieldKey, fieldValue);
133
+                    }
134
+                }
135
+            } catch (RedisConnectionFailureException e) {
136
+                log.error("Redis 连接失败: {} | {}", redisKey, e.getMessage());
137
+            } catch (DataAccessException e) {
138
+                log.error("数据库操作失败: {} | {}", redisKey, e.getMessage(), e);
139
+            } catch (Exception e) {
140
+                log.error("同步设备失败: {} | {}", redisKey, e.getMessage(), e);
141
+            }
142
+        }
143
+    }
144
+
145
+    private String getStringValue(Map<Object, Object> map, String key) {
146
+        Object val = map.get(key);
147
+        return val == null ? null : val.toString().trim();
148
+    }
149
+}

Loading…
Отказ
Запис