Przeglądaj źródła

Merge remote-tracking branch 'origin/mqy20260511'

mqy20260511
humanleft 2 tygodni temu
rodzic
commit
350327912e

+ 46
- 3
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Wyświetl plik

@@ -195,6 +195,44 @@ public class TdEngineService {
195 195
         return columns;
196 196
     }
197 197
 
198
+    // 缓存超级表列类型 (key = dbName.stableName, value = Map<列名, 类型>)
199
+    private final Map<String, Map<String, String>> stableColumnTypeCache = new ConcurrentHashMap<>();
200
+
201
+    /**
202
+     * 获取超级表列类型缓存
203
+     */
204
+    private Map<String, String> getStableColumnTypes(String dbName, String stableName) {
205
+        String key = getStableKey(dbName, stableName);
206
+        Map<String, String> cached = stableColumnTypeCache.get(key);
207
+        if (cached != null) {
208
+            return cached;
209
+        }
210
+        return stableColumnTypeCache.computeIfAbsent(key, k -> loadStableColumnTypesFromDB(dbName, stableName));
211
+    }
212
+
213
+    /**
214
+     * 从数据库加载列类型
215
+     */
216
+    private Map<String, String> loadStableColumnTypesFromDB(String dbName, String stableName) {
217
+        Map<String, String> columnTypes = new HashMap<>();
218
+        String sql = String.format("DESCRIBE %s.%s", wrapName(dbName), wrapName(stableName));
219
+        try (Connection conn = getConnection();
220
+             Statement stmt = conn.createStatement();
221
+             ResultSet rs = stmt.executeQuery(sql)) {
222
+            stmt.setQueryTimeout(5);
223
+            while (rs.next()) {
224
+                String colName = rs.getString(1);
225
+                String colType = rs.getString(2);
226
+                columnTypes.put(colName, colType);
227
+            }
228
+        } catch (SQLException e) {
229
+            if (!e.getMessage().contains("Table does not exist") && !e.getMessage().contains("invalid name")) {
230
+                log.debug("查询超级表列类型失败: {}.{} | {}", dbName, stableName, e.getMessage());
231
+            }
232
+        }
233
+        return columnTypes;
234
+    }
235
+
198 236
     // ==========================================
199 237
     // 初始化表结构(按列存储,无 ext_data)
200 238
     // ==========================================
@@ -273,8 +311,7 @@ public class TdEngineService {
273 311
             for (Map.Entry<String, Object> entry : data.entrySet()) {
274 312
                 String key = entry.getKey();
275 313
                 if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
276
-                    // 统一使用 VARCHAR,避免类型推断导致的问题
277
-                    columnTypes.put(key, "VARCHAR");
314
+                    columnTypes.put(key, getValueType(entry.getValue()));
278 315
                 }
279 316
             }
280 317
         }
@@ -292,6 +329,9 @@ public class TdEngineService {
292 329
             return null;
293 330
         }
294 331
 
332
+        // 获取数据库中实际的列类型,用于格式化值
333
+        Map<String, String> dbColumnTypes = getStableColumnTypes(dbName, superTableName);
334
+
295 335
         StringBuilder sql = new StringBuilder();
296 336
         sql.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
297 337
            .append(" (ts, surfacename");
@@ -311,7 +351,9 @@ public class TdEngineService {
311 351
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
312 352
                 String col = entry.getKey();
313 353
                 Object value = data.get(col);
314
-                sql.append(", ").append(formatValue(value, col, entry.getValue()));
354
+                // 优先使用数据库中的列类型,其次使用数据类型推断
355
+                String colType = dbColumnTypes.getOrDefault(col, entry.getValue());
356
+                sql.append(", ").append(formatValue(value, col, colType));
315 357
             }
316 358
             sql.append("),");
317 359
             hasData = true;
@@ -609,6 +651,7 @@ public class TdEngineService {
609 651
      */
610 652
     public void clearStableColumnCache() {
611 653
         stableColumnCache.clear();
654
+        stableColumnTypeCache.clear();
612 655
         log.info("清除了 TdEngine 超级表结构缓存");
613 656
     }
614 657
 

Ładowanie…
Anuluj
Zapisz