humanleft пре 2 недеља
родитељ
комит
4ccc79ca3e

+ 0
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Прегледај датотеку

12
 import org.springframework.data.redis.core.StringRedisTemplate;
12
 import org.springframework.data.redis.core.StringRedisTemplate;
13
 import org.springframework.stereotype.Component;
13
 import org.springframework.stereotype.Component;
14
 
14
 
15
-import java.time.LocalDate;
16
 import java.time.LocalDateTime;
15
 import java.time.LocalDateTime;
17
 import java.time.format.DateTimeFormatter;
16
 import java.time.format.DateTimeFormatter;
18
 import java.util.*;
17
 import java.util.*;

+ 14
- 18
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Прегледај датотеку

8
 import org.springframework.beans.factory.annotation.Autowired;
8
 import org.springframework.beans.factory.annotation.Autowired;
9
 import org.springframework.stereotype.Service;
9
 import org.springframework.stereotype.Service;
10
 
10
 
11
+import java.nio.charset.StandardCharsets;
11
 import java.sql.*;
12
 import java.sql.*;
12
 import java.time.ZoneOffset;
13
 import java.time.ZoneOffset;
13
 import java.util.*;
14
 import java.util.*;
47
     // 每批次最大列数(防止 TDengine 行超限)
48
     // 每批次最大列数(防止 TDengine 行超限)
48
     private static final int MAX_COLUMNS_PER_INSERT = 100;
49
     private static final int MAX_COLUMNS_PER_INSERT = 100;
49
 
50
 
50
-    // 拆分超级表的列数阈值
51
-    private static final int SPLIT_STABLE_COLUMN_THRESHOLD = 100;
52
-
53
     // 默认 VARCHAR 字段长度限制
51
     // 默认 VARCHAR 字段长度限制
54
     private static final int DEFAULT_VARCHAR_LENGTH = 128;
52
     private static final int DEFAULT_VARCHAR_LENGTH = 128;
55
 
53
 
128
         if (name == null || name.isEmpty()) {
126
         if (name == null || name.isEmpty()) {
129
             return "`unknown`";
127
             return "`unknown`";
130
         }
128
         }
131
-        return "`" + name.replaceAll("`", "") + "`";
129
+        return "`" + name.replace("`", "") + "`";
132
     }
130
     }
133
 
131
 
134
     private boolean isValidFieldName(String name) {
132
     private boolean isValidFieldName(String name) {
291
     private void splitAndInsertToMultipleStables(String dbName, String stableName, String tableName,
289
     private void splitAndInsertToMultipleStables(String dbName, String stableName, String tableName,
292
                                                  List<Map<String, Object>> dataList,
290
                                                  List<Map<String, Object>> dataList,
293
                                                  Map<String, String> columnTypes) throws SQLException {
291
                                                  Map<String, String> columnTypes) throws SQLException {
292
+
294
         // 按首字符 UTF-8 值分组
293
         // 按首字符 UTF-8 值分组
295
-        Map<Integer, Map<String, String>> groupColumnTypes = new LinkedHashMap<>();
294
+        Map<Integer, Map<String, String>> groupColumnTypeMap = new LinkedHashMap<>();
296
         for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
295
         for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
297
             int groupId = getFirstCharGroupId(entry.getKey());
296
             int groupId = getFirstCharGroupId(entry.getKey());
298
-            groupColumnTypes.computeIfAbsent(groupId, k -> new LinkedHashMap<>()).put(entry.getKey(), entry.getValue());
297
+            groupColumnTypeMap.computeIfAbsent(groupId, k -> new LinkedHashMap<>()).put(entry.getKey(), entry.getValue());
299
         }
298
         }
300
 
299
 
301
-        log.info("拆分后超级表数量: {} | 分组: {}", groupColumnTypes.size(), groupColumnTypes.keySet());
300
+        log.info("拆分后超级表数量: {} | 分组: {}", groupColumnTypeMap.size(), groupColumnTypeMap.keySet());
302
 
301
 
303
         // 每个分组作为一个超级表,处理其对应的列和数据
302
         // 每个分组作为一个超级表,处理其对应的列和数据
304
-        for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypes.entrySet()) {
303
+        for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypeMap.entrySet()) {
305
             int groupId = group.getKey();
304
             int groupId = group.getKey();
306
             // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
305
             // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
307
             String newStableName = stableName+"_"+groupId;
306
             String newStableName = stableName+"_"+groupId;
329
             return 0;
328
             return 0;
330
         }
329
         }
331
         String firstChar = columnName.substring(0, 1);
330
         String firstChar = columnName.substring(0, 1);
332
-        byte[] bytes = firstChar.getBytes(java.nio.charset.StandardCharsets.UTF_8);
331
+        byte[] bytes = firstChar.getBytes(StandardCharsets.UTF_8);
333
         return (bytes[0] & 0xFF) % 10;
332
         return (bytes[0] & 0xFF) % 10;
334
     }
333
     }
335
 
334
 
339
     private List<Map<String, Object>> filterDataByColumnGroup(List<Map<String, Object>> dataList, Set<String> columns) {
338
     private List<Map<String, Object>> filterDataByColumnGroup(List<Map<String, Object>> dataList, Set<String> columns) {
340
         List<Map<String, Object>> filtered = new ArrayList<>();
339
         List<Map<String, Object>> filtered = new ArrayList<>();
341
         for (Map<String, Object> data : dataList) {
340
         for (Map<String, Object> data : dataList) {
342
-            if (data == null) continue;
341
+            if (data == null) {
342
+                continue;
343
+            }
343
             Map<String, Object> filteredRow = new LinkedHashMap<>();
344
             Map<String, Object> filteredRow = new LinkedHashMap<>();
344
             for (String col : columns) {
345
             for (String col : columns) {
345
                 if (data.containsKey(col)) {
346
                 if (data.containsKey(col)) {
351
         return filtered;
352
         return filtered;
352
     }
353
     }
353
 
354
 
354
-    private String extractSuperTableName(String table) {
355
-        int idx = table.lastIndexOf('_');
356
-        return idx > 0 ? table.substring(0, idx) : table;
357
-    }
358
-
359
     /**
355
     /**
360
      * 收集数据中所有动态列及其类型
356
      * 收集数据中所有动态列及其类型
361
      */
357
      */
431
         if (!hasData) {
427
         if (!hasData) {
432
             return null;
428
             return null;
433
         }
429
         }
434
-        log.info("生成的 INSERT SQL | 列类型: {} | SQL 前100字符: {}", columnTypes, sql.toString().substring(0, Math.min(100, sql.toString().length())));
430
+        log.info("生成的 INSERT SQL | 列类型: {} | SQL 前100字符: {}", columnTypes, sql.substring(0, Math.min(100, sql.toString().length())));
435
         sql.setLength(sql.length() - 1);
431
         sql.setLength(sql.length() - 1);
436
         return sql.toString();
432
         return sql.toString();
437
     }
433
     }
563
     /**
559
     /**
564
      * 根据 TdEngine 列类型获取创建列的 SQL 类型
560
      * 根据 TdEngine 列类型获取创建列的 SQL 类型
565
      */
561
      */
566
-    private String getColumnTypeForDDL(String tdType, String columnName) {
562
+    private String getColumnTypeForDDL(String tdType) {
567
         switch (tdType) {
563
         switch (tdType) {
568
             case "BOOL":
564
             case "BOOL":
569
             case "BIGINT":
565
             case "BIGINT":
654
                 }
650
                 }
655
 
651
 
656
                 if (!existingColumns.contains(col)) {
652
                 if (!existingColumns.contains(col)) {
657
-                    String ddlType = getColumnTypeForDDL(colType, col);
653
+                    String ddlType = getColumnTypeForDDL(colType);
658
                     String alterSql = String.format(
654
                     String alterSql = String.format(
659
                             "ALTER STABLE %s.%s ADD COLUMN %s %s",
655
                             "ALTER STABLE %s.%s ADD COLUMN %s %s",
660
                             wrapName(dbName),
656
                             wrapName(dbName),

Loading…
Откажи
Сачувај