Просмотр исходного кода

Merge remote-tracking branch 'origin/mqy20260511'

mqy20260511
humanleft 2 недель назад
Родитель
Сommit
a2d0c7d0d6

+ 1
- 1
iot-platform/pom.xml Просмотреть файл

198
                         </configuration>
198
                         </configuration>
199
                     </execution>
199
                     </execution>
200
                 </executions>
200
                 </executions>
201
-            </plugin>
201
+            </plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin>
202
         </plugins>
202
         </plugins>
203
         <finalName>${project.artifactId}</finalName>
203
         <finalName>${project.artifactId}</finalName>
204
     </build>
204
     </build>

+ 1
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Просмотреть файл

73
         String superTable = topicParts[3];
73
         String superTable = topicParts[3];
74
         LocalDate date = LocalDate.now();
74
         LocalDate date = LocalDate.now();
75
         String tableName = superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
75
         String tableName = superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
76
-        tdengineService.insertBatch(dbName, tableName, batchToInsert);
76
+        tdengineService.insertBatch(dbName, superTable,tableName, batchToInsert);
77
     }
77
     }
78
 }
78
 }

+ 3
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Просмотреть файл

110
         String tableName = ctx.superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
110
         String tableName = ctx.superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
111
 
111
 
112
         List<Map<String, Object>> batch = Collections.singletonList(data);
112
         List<Map<String, Object>> batch = Collections.singletonList(data);
113
-        tdengineService.insertBatch(ctx.dbName, tableName, batch);
113
+
114
+//        log.info("数据:({})",batch);
115
+        tdengineService.insertBatch(ctx.dbName, ctx.superTable,tableName, batch);
114
     }
116
     }
115
 
117
 
116
     private void writeToRedis(MessageContext ctx) {
118
     private void writeToRedis(MessageContext ctx) {

+ 30
- 13
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Просмотреть файл

32
     private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
32
     private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
33
     private static final int MAX_CACHE_SIZE = 1000;
33
     private static final int MAX_CACHE_SIZE = 1000;
34
 
34
 
35
-    // 允许的列名字符(仅 ASCII 字母、数字、下划线)
36
-    private static final String ALLOWED_COLUMNS = "^[a-zA-Z_][a-zA-Z0-9_]*$";
35
+    // 允许的列名字符(支持中文、ASCII字母、数字、下划线)
36
+    private static final String ALLOWED_COLUMNS = "^[a-zA-Z_一-龥][a-zA-Z0-9_一-龥]*$";
37
 
37
 
38
     // 允许的表名/数据库名字符(允许数字开头和连字符,如 UUID 格式的表名)
38
     // 允许的表名/数据库名字符(允许数字开头和连字符,如 UUID 格式的表名)
39
     private static final String ALLOWED_TABLE_NAME = "^[a-zA-Z0-9][a-zA-Z0-9_-]*$";
39
     private static final String ALLOWED_TABLE_NAME = "^[a-zA-Z0-9][a-zA-Z0-9_-]*$";
122
         if (name == null || name.isEmpty()) {
122
         if (name == null || name.isEmpty()) {
123
             return "`unknown`";
123
             return "`unknown`";
124
         }
124
         }
125
-        return "`" + name.replace("`", "") + "`";
125
+        return "`" + name.replaceAll("`", "") + "`";
126
     }
126
     }
127
 
127
 
128
     private boolean isValidFieldName(String name) {
128
     private boolean isValidFieldName(String name) {
133
         return name != null && name.matches(ALLOWED_TABLE_NAME);
133
         return name != null && name.matches(ALLOWED_TABLE_NAME);
134
     }
134
     }
135
 
135
 
136
+    private boolean isNumeric(String str) {
137
+        if (str == null || str.isEmpty()) {
138
+            return false;
139
+        }
140
+        try {
141
+            Double.parseDouble(str);
142
+            return true;
143
+        } catch (NumberFormatException e) {
144
+            return false;
145
+        }
146
+    }
147
+
136
     // === 缓存工具方法 ===
148
     // === 缓存工具方法 ===
137
     private String getStableKey(String dbName, String stableName) {
149
     private String getStableKey(String dbName, String stableName) {
138
         return dbName + "." + stableName;
150
         return dbName + "." + stableName;
196
 
208
 
197
             // 创建超级表:固定 ts + surfacename,无 ext_data 列
209
             // 创建超级表:固定 ts + surfacename,无 ext_data 列
198
             String stableSql = String.format(
210
             String stableSql = String.format(
199
-                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location BINARY(64))",
211
+                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location VARCHAR(255))",
200
                     wrapName(dbName),
212
                     wrapName(dbName),
201
                     wrapName(superTableName)
213
                     wrapName(superTableName)
202
             );
214
             );
225
     // ==========================================
237
     // ==========================================
226
     // 批量插入(按列存储)
238
     // 批量插入(按列存储)
227
     // ==========================================
239
     // ==========================================
228
-    public void insertBatch(String dbName, String table, List<Map<String, Object>> dataList)
240
+    public void insertBatch(String dbName, String superTable,String table, List<Map<String, Object>> dataList)
229
             throws SQLException {
241
             throws SQLException {
230
 
242
 
231
         if (dataList == null || dataList.isEmpty()) {
243
         if (dataList == null || dataList.isEmpty()) {
233
             return;
245
             return;
234
         }
246
         }
235
 
247
 
236
-        String superTableName = extractSuperTableName(table);
248
+//        String superTableName = extractSuperTableName(table);
237
 
249
 
238
         int batchSize = DEFAULT_BATCH_SIZE;
250
         int batchSize = DEFAULT_BATCH_SIZE;
239
         for (int i = 0; i < dataList.size(); i += batchSize) {
251
         for (int i = 0; i < dataList.size(); i += batchSize) {
240
             List<Map<String, Object>> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size()));
252
             List<Map<String, Object>> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size()));
241
-            insertBatchInternal(dbName, superTableName, table, batch);
253
+            insertBatchInternal(dbName, superTable, table, batch);
242
         }
254
         }
243
 
255
 
244
         log.info("批量写入成功: {} | 条数: {}", table, dataList.size());
256
         log.info("批量写入成功: {} | 条数: {}", table, dataList.size());
261
             for (Map.Entry<String, Object> entry : data.entrySet()) {
273
             for (Map.Entry<String, Object> entry : data.entrySet()) {
262
                 String key = entry.getKey();
274
                 String key = entry.getKey();
263
                 if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
275
                 if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
264
-                    columnTypes.put(key, getValueType(entry.getValue()));
276
+                    // 统一使用 VARCHAR,避免类型推断导致的问题
277
+                    columnTypes.put(key, "VARCHAR");
265
                 }
278
                 }
266
             }
279
             }
267
         }
280
         }
281
+
282
+        log.debug("收集到的列: {} | 共 {} 列", columnTypes.keySet(), columnTypes.size());
268
         return columnTypes;
283
         return columnTypes;
269
     }
284
     }
270
 
285
 
305
         if (!hasData) {
320
         if (!hasData) {
306
             return null;
321
             return null;
307
         }
322
         }
308
-
323
+        log.info("生成的 INSERT SQL | 列类型: {} | SQL 前100字符: {}", columnTypes, sql.toString().substring(0, Math.min(100, sql.toString().length())));
309
         sql.setLength(sql.length() - 1);
324
         sql.setLength(sql.length() - 1);
310
         return sql.toString();
325
         return sql.toString();
311
     }
326
     }
329
             return;
344
             return;
330
         }
345
         }
331
 
346
 
347
+        log.info("收集到的列类型: {}", columnTypes);
332
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
348
         Set<String> existingColumns = getStableColumns(dbName, superTableName);
349
+        log.info("超级表已有列: {}", existingColumns);
333
         // 计算实际新增列数(扣除已有列的交集)
350
         // 计算实际新增列数(扣除已有列的交集)
334
         int newColumns = 0;
351
         int newColumns = 0;
335
         for (String col : columnTypes.keySet()) {
352
         for (String col : columnTypes.keySet()) {
542
         }
559
         }
543
 
560
 
544
         try (Connection conn = getConnection()) {
561
         try (Connection conn = getConnection()) {
545
-            // 检查超级表是否存在 — 使用 PreparedStatement 防止 SQL 注入
546
-            String checkStableSql = "SELECT * FROM information_schema.ins_tables WHERE table_name = ? AND db_name = ?";
562
+            // 检查超级表是否存在(stable_name 字段标识超级表)
563
+            String checkStableSql = "SELECT * FROM information_schema.ins_tables WHERE stable_name = ? AND db_name = ?";
547
             try (PreparedStatement pStmt = conn.prepareStatement(checkStableSql)) {
564
             try (PreparedStatement pStmt = conn.prepareStatement(checkStableSql)) {
548
                 pStmt.setString(1, superTableName);
565
                 pStmt.setString(1, superTableName);
549
                 pStmt.setString(2, dbName);
566
                 pStmt.setString(2, dbName);
557
                 }
574
                 }
558
             }
575
             }
559
 
576
 
560
-            // 检查子表是否存在 — 使用 PreparedStatement 防止 SQL 注入
577
+            // 检查子表是否存在
561
             String checkTableSql = "SELECT * FROM information_schema.ins_tables " +
578
             String checkTableSql = "SELECT * FROM information_schema.ins_tables " +
562
-                    "WHERE table_name = ? AND db_name = ? AND table_type = 'CHILD_TABLE'";
579
+                    "WHERE table_name = ? AND db_name = ? AND type = 'CHILD_TABLE'";
563
             try (PreparedStatement pStmt = conn.prepareStatement(checkTableSql)) {
580
             try (PreparedStatement pStmt = conn.prepareStatement(checkTableSql)) {
564
                 pStmt.setString(1, table);
581
                 pStmt.setString(1, table);
565
                 pStmt.setString(2, dbName);
582
                 pStmt.setString(2, dbName);

+ 3
- 3
iot-platform/src/test/java/com/iot/platform/mqtt/MqttChargeStationConsumerTest.java Просмотреть файл

101
 
101
 
102
         method.invoke(consumer, dataList, topic);
102
         method.invoke(consumer, dataList, topic);
103
 
103
 
104
-        verify(tdengineService).insertBatch(eq("myDb"), eq(expectedTable), anyList());
104
+        verify(tdengineService).insertBatch(eq("myDb"), "mySuperTable_",eq(expectedTable), anyList());
105
     }
105
     }
106
 
106
 
107
     @Test
107
     @Test
113
 
113
 
114
         method.invoke(consumer, Collections.emptyList(), "prefix/db/suffix/table/extra");
114
         method.invoke(consumer, Collections.emptyList(), "prefix/db/suffix/table/extra");
115
 
115
 
116
-        verify(tdengineService, never()).insertBatch(anyString(), anyString(), anyList());
116
+        verify(tdengineService, never()).insertBatch(anyString(), "mySuperTable_",anyString(), anyList());
117
     }
117
     }
118
 
118
 
119
     @Test
119
     @Test
151
 
151
 
152
         method.invoke(consumer, dataList, topic);
152
         method.invoke(consumer, dataList, topic);
153
 
153
 
154
-        verify(tdengineService).insertBatch(eq("myDb"), eq(expectedTable), argThat(list -> list.size() == 1));
154
+        verify(tdengineService).insertBatch(eq("myDb"), "mySuperTable_",eq(expectedTable), argThat(list -> list.size() == 1));
155
     }
155
     }
156
 }
156
 }

+ 2
- 2
iot-platform/src/test/java/com/iot/platform/service/TDengineServiceTest.java Просмотреть файл

305
     @Test
305
     @Test
306
     @DisplayName("insertBatch: 空列表应直接返回不抛异常")
306
     @DisplayName("insertBatch: 空列表应直接返回不抛异常")
307
     void insertBatch_emptyList_returnsWithoutError() throws Exception {
307
     void insertBatch_emptyList_returnsWithoutError() throws Exception {
308
-        tdengineService.insertBatch("db", "table", Collections.emptyList());
309
-        tdengineService.insertBatch("db", "table", null);
308
+        tdengineService.insertBatch("db", "supertable","table", Collections.emptyList());
309
+        tdengineService.insertBatch("db", "supertable","table", null);
310
     }
310
     }
311
 
311
 
312
     @Test
312
     @Test

Загрузка…
Отмена
Сохранить