Просмотр исходного кода

Merge branch 'mqy20260511' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511

 Conflicts:
	iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java
mqy20260511
lenovo 2 недель назад
Родитель
Сommit
a21ed3c069

+ 3
- 3
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Просмотреть файл

70
         }
70
         }
71
 
71
 
72
         String dbName = topicParts[1];
72
         String dbName = topicParts[1];
73
-        String dbNamePrefix = dbName.substring(0, 2);
73
+        String dbNamePrefix = "pe_iot_"+dbName.substring(0, 2);
74
         //先传输为deviceid,到后面进行优化
74
         //先传输为deviceid,到后面进行优化
75
         String superTable = topicParts[3];
75
         String superTable = topicParts[3];
76
 //        String tableName = superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
76
 //        String tableName = superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
77
                            //deviceid_controllerid
77
                            //deviceid_controllerid
78
-        String tableName = superTable + "_" +dbName;
79
-
78
+        String tableName = dbName;
79
+//        superTable + "_" +
80
 //        int i = ((int) "g".getBytes("UTF-8")[0]) % 10;
80
 //        int i = ((int) "g".getBytes("UTF-8")[0]) % 10;
81
 
81
 
82
         tdengineService.insertBatch(dbNamePrefix, superTable,tableName, batchToInsert);
82
         tdengineService.insertBatch(dbNamePrefix, superTable,tableName, batchToInsert);

+ 3
- 7
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Просмотреть файл

104
         data.put("createTime", ctx.currentTime);
104
         data.put("createTime", ctx.currentTime);
105
         data.put("timestamp", ctx.timestamp);
105
         data.put("timestamp", ctx.timestamp);
106
         data.put("device_id", ctx.deviceId);
106
         data.put("device_id", ctx.deviceId);
107
-
108
-        LocalDate date = LocalDate.now();
109
-        String tableName = ctx.superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
110
-
111
         List<Map<String, Object>> batch = Collections.singletonList(data);
107
         List<Map<String, Object>> batch = Collections.singletonList(data);
112
 
108
 
113
-
114
-//        log.info("数据:({})",batch);
115
-        tdengineService.insertBatch(ctx.dbName, ctx.superTable,tableName, batch);
109
+        String dbNamePrefix = "pe_iot_"+ctx.dbName.substring(0, 2);
110
+        String tableName = ctx.dbName;
111
+        tdengineService.insertBatch(dbNamePrefix, ctx.superTable,tableName, batch);
116
     }
112
     }
117
 
113
 
118
     private void writeToRedis(MessageContext ctx) {
114
     private void writeToRedis(MessageContext ctx) {

+ 7
- 6
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Просмотреть файл

288
     /**
288
     /**
289
      * 根据列 key 首字符的 UTF-8 值拆分到多个超级表
289
      * 根据列 key 首字符的 UTF-8 值拆分到多个超级表
290
      */
290
      */
291
-    private void splitAndInsertToMultipleStables(String dbName, String baseSuperTable, String baseTable,
291
+    private void splitAndInsertToMultipleStables(String dbName, String stableName, String tableName,
292
                                                  List<Map<String, Object>> dataList,
292
                                                  List<Map<String, Object>> dataList,
293
                                                  Map<String, String> columnTypes) throws SQLException {
293
                                                  Map<String, String> columnTypes) throws SQLException {
294
         // 按首字符 UTF-8 值分组
294
         // 按首字符 UTF-8 值分组
303
         // 每个分组作为一个超级表,处理其对应的列和数据
303
         // 每个分组作为一个超级表,处理其对应的列和数据
304
         for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypes.entrySet()) {
304
         for (Map.Entry<Integer, Map<String, String>> group : groupColumnTypes.entrySet()) {
305
             int groupId = group.getKey();
305
             int groupId = group.getKey();
306
-            String stableName = "device_" + groupId;
307
-            String tableName = baseTable + "_" + groupId;
306
+            // 超级表名 = superTableName + groupId (例如 charge + 3 = charge3)
307
+            String newStableName = stableName+"_"+groupId;
308
+            String newTableName = stableName+"_"+groupId+"_"+tableName;
308
 
309
 
309
             // 筛选出该组列的数据
310
             // 筛选出该组列的数据
310
             List<Map<String, Object>> filteredData = filterDataByColumnGroup(dataList, group.getValue().keySet());
311
             List<Map<String, Object>> filteredData = filterDataByColumnGroup(dataList, group.getValue().keySet());
313
             int batchSize = DEFAULT_BATCH_SIZE;
314
             int batchSize = DEFAULT_BATCH_SIZE;
314
             for (int i = 0; i < filteredData.size(); i += batchSize) {
315
             for (int i = 0; i < filteredData.size(); i += batchSize) {
315
                 List<Map<String, Object>> batch = filteredData.subList(i, Math.min(i + batchSize, filteredData.size()));
316
                 List<Map<String, Object>> batch = filteredData.subList(i, Math.min(i + batchSize, filteredData.size()));
316
-                insertBatchInternal(dbName, stableName, tableName, batch);
317
+                insertBatchInternal(dbName, newStableName, newTableName, batch);
317
             }
318
             }
318
             log.info("分组插入完成: stable={}, table={}, 列数={}, 数据条数={}",
319
             log.info("分组插入完成: stable={}, table={}, 列数={}, 数据条数={}",
319
-                    stableName, tableName, group.getValue().size(), filteredData.size());
320
+                    newStableName, newTableName, group.getValue().size(), filteredData.size());
320
         }
321
         }
321
     }
322
     }
322
 
323
 
329
         }
330
         }
330
         String firstChar = columnName.substring(0, 1);
331
         String firstChar = columnName.substring(0, 1);
331
         byte[] bytes = firstChar.getBytes(java.nio.charset.StandardCharsets.UTF_8);
332
         byte[] bytes = firstChar.getBytes(java.nio.charset.StandardCharsets.UTF_8);
332
-        return (int) bytes[0] % 10;
333
+        return (bytes[0] & 0xFF) % 10;
333
     }
334
     }
334
 
335
 
335
     /**
336
     /**

Загрузка…
Отмена
Сохранить