Преглед изворни кода

Merge branch 'mqy20260511' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511

mqy20260511
lenovo пре 2 недеља
родитељ
комит
609e23ff9d
1 измењених фајлова са 46 додато и 3 уклоњено
  1. 46
    3
      iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java

+ 46
- 3
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Прегледај датотеку

195
         return columns;
195
         return columns;
196
     }
196
     }
197
 
197
 
198
+    // 缓存超级表列类型 (key = dbName.stableName, value = Map<列名, 类型>)
199
+    private final Map<String, Map<String, String>> stableColumnTypeCache = new ConcurrentHashMap<>();
200
+
201
+    /**
202
+     * 获取超级表列类型缓存
203
+     */
204
+    private Map<String, String> getStableColumnTypes(String dbName, String stableName) {
205
+        String key = getStableKey(dbName, stableName);
206
+        Map<String, String> cached = stableColumnTypeCache.get(key);
207
+        if (cached != null) {
208
+            return cached;
209
+        }
210
+        return stableColumnTypeCache.computeIfAbsent(key, k -> loadStableColumnTypesFromDB(dbName, stableName));
211
+    }
212
+
213
+    /**
214
+     * 从数据库加载列类型
215
+     */
216
+    private Map<String, String> loadStableColumnTypesFromDB(String dbName, String stableName) {
217
+        Map<String, String> columnTypes = new HashMap<>();
218
+        String sql = String.format("DESCRIBE %s.%s", wrapName(dbName), wrapName(stableName));
219
+        try (Connection conn = getConnection();
220
+             Statement stmt = conn.createStatement();
221
+             ResultSet rs = stmt.executeQuery(sql)) {
222
+            stmt.setQueryTimeout(5);
223
+            while (rs.next()) {
224
+                String colName = rs.getString(1);
225
+                String colType = rs.getString(2);
226
+                columnTypes.put(colName, colType);
227
+            }
228
+        } catch (SQLException e) {
229
+            if (!e.getMessage().contains("Table does not exist") && !e.getMessage().contains("invalid name")) {
230
+                log.debug("查询超级表列类型失败: {}.{} | {}", dbName, stableName, e.getMessage());
231
+            }
232
+        }
233
+        return columnTypes;
234
+    }
235
+
198
     // ==========================================
236
     // ==========================================
199
     // 初始化表结构(按列存储,无 ext_data)
237
     // 初始化表结构(按列存储,无 ext_data)
200
     // ==========================================
238
     // ==========================================
273
             for (Map.Entry<String, Object> entry : data.entrySet()) {
311
             for (Map.Entry<String, Object> entry : data.entrySet()) {
274
                 String key = entry.getKey();
312
                 String key = entry.getKey();
275
                 if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
313
                 if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
276
-                    // 统一使用 VARCHAR,避免类型推断导致的问题
277
-                    columnTypes.put(key, "VARCHAR");
314
+                    columnTypes.put(key, getValueType(entry.getValue()));
278
                 }
315
                 }
279
             }
316
             }
280
         }
317
         }
292
             return null;
329
             return null;
293
         }
330
         }
294
 
331
 
332
+        // 获取数据库中实际的列类型,用于格式化值
333
+        Map<String, String> dbColumnTypes = getStableColumnTypes(dbName, superTableName);
334
+
295
         StringBuilder sql = new StringBuilder();
335
         StringBuilder sql = new StringBuilder();
296
         sql.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
336
         sql.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
297
            .append(" (ts, surfacename");
337
            .append(" (ts, surfacename");
311
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
351
             for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
312
                 String col = entry.getKey();
352
                 String col = entry.getKey();
313
                 Object value = data.get(col);
353
                 Object value = data.get(col);
314
-                sql.append(", ").append(formatValue(value, col, entry.getValue()));
354
+                // 优先使用数据库中的列类型,其次使用数据类型推断
355
+                String colType = dbColumnTypes.getOrDefault(col, entry.getValue());
356
+                sql.append(", ").append(formatValue(value, col, colType));
315
             }
357
             }
316
             sql.append("),");
358
             sql.append("),");
317
             hasData = true;
359
             hasData = true;
607
      */
649
      */
608
     public void clearStableColumnCache() {
650
     public void clearStableColumnCache() {
609
         stableColumnCache.clear();
651
         stableColumnCache.clear();
652
+        stableColumnTypeCache.clear();
610
         log.info("清除了 TdEngine 超级表结构缓存");
653
         log.info("清除了 TdEngine 超级表结构缓存");
611
     }
654
     }
612
 
655
 

Loading…
Откажи
Сачувај