2 Комити

Аутор SHA1 Порука Датум
  lenovo 1a1e92631a Merge branch 'mqy20260511' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511 пре 1 дан
  lenovo 265a8fbc46 Merge branch 'master' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511 пре 2 дана

+ 263
- 93
iot-platform/src/main/java/com/iot/platform/service/TDengineService.java Прегледај датотеку

@@ -9,13 +9,9 @@ import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.beans.factory.annotation.Qualifier;
10 10
 import org.springframework.stereotype.Service;
11 11
 
12
-import java.io.ByteArrayOutputStream;
13 12
 import java.sql.*;
14 13
 import java.util.*;
15 14
 import java.util.concurrent.*;
16
-import java.util.zip.*;
17
-
18
-import com.fasterxml.jackson.databind.ObjectMapper;
19 15
 
20 16
 @Service("tdengineService")
21 17
 public class TDengineService {
@@ -34,15 +30,15 @@ public class TDengineService {
34 30
     private HikariDataSource dataSource;
35 31
     private volatile boolean dataSourceInitialized = false;
36 32
 
37
-    // === 新增:缓存超级表结构 (key = dbName.stableName) ===
33
+    // === 缓存超级表结构 (key = dbName.stableName) ===
38 34
     private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
39 35
     private static final int MAX_CACHE_SIZE = 1000;
40 36
 
41
-    // JSON 列名,用于存储所有动态字段
42
-    private static final String JSON_COLUMN_NAME = "ext_data";
37
+    // 允许的列名字符
38
+    private static final String ALLOWED_COLUMNS = "^[a-zA-Z_][a-zA-Z0-9_]*$";
43 39
 
44
-    // ObjectMapper 线程安全,可复用
45
-    private static final ObjectMapper objectMapper = new ObjectMapper();
40
+    // TDengine 超级表最大列数限制
41
+    private static final int MAX_COLUMNS_PER_STABLE = 4096;
46 42
 
47 43
     private synchronized void initDataSource() {
48 44
         if (dataSourceInitialized) {
@@ -72,7 +68,7 @@ public class TDengineService {
72 68
     }
73 69
 
74 70
     /**
75
-     * 手动获取连接
71
+     * 获取连接
76 72
      */
77 73
     public Connection getConnection() throws SQLException {
78 74
         if (!dataSourceInitialized) {
@@ -86,7 +82,7 @@ public class TDengineService {
86 82
     }
87 83
 
88 84
     /**
89
-     * 手动关闭连接
85
+     * 关闭连接
90 86
      */
91 87
     public void closeConnection(Connection conn) {
92 88
         if (conn != null) {
@@ -111,7 +107,7 @@ public class TDengineService {
111 107
     }
112 108
 
113 109
     private boolean isValidFieldName(String name) {
114
-        return name != null && name.matches("^[a-zA-Z0-9_]+$");
110
+        return name != null && name.matches(ALLOWED_COLUMNS);
115 111
     }
116 112
 
117 113
     // === 缓存工具方法 ===
@@ -129,15 +125,12 @@ public class TDengineService {
129 125
             return cached;
130 126
         }
131 127
 
132
-        // 缓存未命中,查 DB
133 128
         Set<String> columns = loadStableColumnsFromDB(dbName, stableName);
134
-        if (!columns.isEmpty()) {
135
-            if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
136
-                stableColumnCache.clear();
137
-                log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
138
-            }
139
-            stableColumnCache.put(key, columns);
129
+        if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
130
+            stableColumnCache.clear();
131
+            log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
140 132
         }
133
+        stableColumnCache.put(key, columns);
141 134
         return columns;
142 135
     }
143 136
 
@@ -151,7 +144,7 @@ public class TDengineService {
151 144
         try {
152 145
             conn = getConnection();
153 146
             stmt = conn.createStatement();
154
-            stmt.setQueryTimeout(5); // ⭐ 防止无限等待
147
+            stmt.setQueryTimeout(5);
155 148
             rs = stmt.executeQuery(sql);
156 149
             while (rs.next()) {
157 150
                 columns.add(rs.getString(1));
@@ -169,7 +162,7 @@ public class TDengineService {
169 162
     }
170 163
 
171 164
     // ==========================================
172
-    // 初始化表结构
165
+    // 初始化表结构(按列存储,无 ext_data)
173 166
     // ==========================================
174 167
     private void initTableStructure(String dbName, String supertablename, String table, Set<String> fieldNames) throws SQLException {
175 168
         Connection conn = null;
@@ -181,10 +174,9 @@ public class TDengineService {
181 174
 
182 175
             stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName));
183 176
 
184
-            // 创建超级表:固定 ts + surfacename + ext_data(VARCHAR)
185
-            // 注:TDengine 2.x JSON 类型只能用于 TAGS,不能用于普通列
177
+            // 创建超级表:固定 ts + surfacename,无 ext_data 列
186 178
             String stableSql = String.format(
187
-                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64), ext_data VARCHAR(8192)) TAGS (location BINARY(64))",
179
+                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location BINARY(64))",
188 180
                     wrapName(dbName),
189 181
                     wrapName(supertablename)
190 182
             );
@@ -192,16 +184,12 @@ public class TDengineService {
192 184
 
193 185
             // 更新缓存:固定列
194 186
             String key = getStableKey(dbName, supertablename);
195
-            if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
196
-                stableColumnCache.clear();
197
-                log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
198
-            }
199 187
             Set<String> fixedCols = new HashSet<>();
200 188
             fixedCols.add("ts");
201 189
             fixedCols.add("surfacename");
202
-            fixedCols.add("ext_data");
203 190
             stableColumnCache.put(key, fixedCols);
204 191
 
192
+            // 创建子表
205 193
             String tableSql = String.format(
206 194
                     "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
207 195
                     wrapName(dbName),
@@ -219,17 +207,17 @@ public class TDengineService {
219 207
     }
220 208
 
221 209
     // ==========================================
222
-    // 批量插入
210
+    // 批量插入(按列存储)
223 211
     // ==========================================
224 212
     public void insertBatch(String dbName, String table, List<Map<String, Object>> dataList) throws SQLException {
225 213
         if (dataList == null || dataList.isEmpty()) return;
226 214
 
227 215
         String supertablename = table.contains("_") ? table.substring(0, table.lastIndexOf('_')) : table;
228 216
 
229
-        // 初始化表结构(固定为 ts + surfacename + ext_data)
217
+        // 初始化表结构
230 218
         initTableStructure(dbName, supertablename, table, Collections.emptySet());
231 219
 
232
-        // 分批处理,每批最多 50 条记录(避免 SQL 过长)
220
+        // 分批处理,每批最多 50 条记录
233 221
         int batchSize = 50;
234 222
         int totalBatches = (dataList.size() + batchSize - 1) / batchSize;
235 223
 
@@ -248,23 +236,65 @@ public class TDengineService {
248 236
      * 内部方法:插入一批数据
249 237
      */
250 238
     private void insertBatchInternal(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) throws SQLException {
251
-        // 确保表存在(可能有竞态条件)
252 239
         ensureTableExists(dbName, supertablename, table);
253 240
 
241
+        // 收集所有动态列名及其数据类型
242
+        Map<String, String> columnTypes = new LinkedHashMap<>();
243
+        for (Map<String, Object> data : dataList) {
244
+            if (data == null) continue;
245
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
246
+                String key = entry.getKey();
247
+                if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
248
+                    columnTypes.put(key, getValueType(entry.getValue()));
249
+                }
250
+            }
251
+        }
252
+
253
+        // 获取当前列数
254
+        Set<String> existingColumns = getStableColumns(dbName, supertablename);
255
+        int totalColumns = existingColumns.size() + columnTypes.size();
256
+
257
+        // 检查列数是否超限
258
+        if (totalColumns > MAX_COLUMNS_PER_STABLE) {
259
+            log.error("超级表总列数({})超过限制({}),跳过本次插入 | 表: {}.{}",
260
+                    totalColumns, MAX_COLUMNS_PER_STABLE, dbName, table);
261
+            return;
262
+        }
263
+
264
+        // 确保所有列都存在(自动 ALTER 添加新列)
265
+        ensureColumnsExist(dbName, supertablename, columnTypes);
266
+
267
+        // 重新获取列数以确保一致性
268
+        existingColumns = getStableColumns(dbName, supertablename);
269
+        if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
270
+            log.error("超级表列数已达上限({}),停止插入数据 | 表: {}.{}",
271
+                    MAX_COLUMNS_PER_STABLE, dbName, table);
272
+            return;
273
+        }
274
+
275
+        // 构建 SQL
254 276
         StringBuilder sqlBuilder = new StringBuilder();
255 277
         sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
256
-                .append(" (ts, surfacename, ext_data) VALUES ");
278
+                .append(" (ts, surfacename");
279
+
280
+        for (String col : columnTypes.keySet()) {
281
+            sqlBuilder.append(", ").append(wrapName(col));
282
+        }
283
+        sqlBuilder.append(") VALUES ");
257 284
 
258 285
         boolean hasData = false;
259 286
         for (Map<String, Object> data : dataList) {
260 287
             if (data == null) continue;
261 288
 
262
-            // 将动态字段转为 JSON 并 GZIP 压缩
263
-            String extJson = buildDynamicJson(data);
264
-            String compressed = compressToBase64(extJson);
289
+            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("'");
265 290
 
266
-            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
267
-                    .append(escapeValue(compressed)).append("'),");
291
+            for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
292
+                String col = entry.getKey();
293
+                String colType = entry.getValue();
294
+                Object value = data.get(col);
295
+                sqlBuilder.append(", ").append(formatValue(value, col, colType));
296
+            }
297
+            sqlBuilder.append("),");
268 298
             hasData = true;
269 299
         }
270 300
 
@@ -281,7 +311,6 @@ public class TDengineService {
281 311
             stmt.setQueryTimeout(30);
282 312
             stmt.executeUpdate(finalSql);
283 313
         } catch (SQLException e) {
284
-            // 表不存在时尝试重建表后重试
285 314
             if (e.getMessage().contains("Table does not exist")) {
286 315
                 log.warn("表不存在,重建表: {}", table);
287 316
                 initTableStructure(dbName, supertablename, table, Collections.emptySet());
@@ -296,20 +325,44 @@ public class TDengineService {
296 325
     }
297 326
 
298 327
     /**
299
-     * 重试插入(表重建后)
328
+     * 重试插入
300 329
      */
301 330
     private void insertBatchRetry(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) throws SQLException {
331
+        Map<String, String> columnTypes = new LinkedHashMap<>();
332
+        for (Map<String, Object> data : dataList) {
333
+            if (data == null) continue;
334
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
335
+                String key = entry.getKey();
336
+                if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
337
+                    columnTypes.put(key, getValueType(entry.getValue()));
338
+                }
339
+            }
340
+        }
341
+
342
+        ensureColumnsExist(dbName, supertablename, columnTypes);
343
+
302 344
         StringBuilder sqlBuilder = new StringBuilder();
303 345
         sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
304
-                .append(" (ts, surfacename, ext_data) VALUES ");
346
+                .append(" (ts, surfacename");
347
+
348
+        for (String col : columnTypes.keySet()) {
349
+            sqlBuilder.append(", ").append(wrapName(col));
350
+        }
351
+        sqlBuilder.append(") VALUES ");
305 352
 
306 353
         boolean hasData = false;
307 354
         for (Map<String, Object> data : dataList) {
308 355
             if (data == null) continue;
309
-            String extJson = buildDynamicJson(data);
310
-            String compressed = compressToBase64(extJson);
311
-            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
312
-                    .append(escapeValue(compressed)).append("'),");
356
+
357
+            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("'");
358
+
359
+            for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
360
+                String col = entry.getKey();
361
+                String colType = entry.getValue();
362
+                Object value = data.get(col);
363
+                sqlBuilder.append(", ").append(formatValue(value, col, colType));
364
+            }
365
+            sqlBuilder.append("),");
313 366
             hasData = true;
314 367
         }
315 368
 
@@ -334,7 +387,166 @@ public class TDengineService {
334 387
     }
335 388
 
336 389
     /**
337
-     * 确保表存在(检查+创建)
390
+     * 判断是否为保留列(ts, surfacename)
391
+     */
392
+    private boolean isReservedColumn(String columnName) {
393
+        return "ts".equalsIgnoreCase(columnName) || "surfacename".equalsIgnoreCase(columnName);
394
+    }
395
+
396
+    /**
397
+     * 判断值类型,返回 TDengine 对应的数据类型
398
+     */
399
+    private String getValueType(Object value) {
400
+        if (value == null) {
401
+            return "VARCHAR";
402
+        }
403
+        if (value instanceof Boolean) {
404
+            return "BOOL";
405
+        }
406
+        if (value instanceof Integer || value instanceof Long) {
407
+            return "BIGINT";
408
+        }
409
+        if (value instanceof Float || value instanceof Double) {
410
+            return "DOUBLE";
411
+        }
412
+        // 时间类型(Date, Timestamp, LocalDateTime 等)
413
+        if (value instanceof java.util.Date || value instanceof java.sql.Timestamp
414
+                || value instanceof java.time.LocalDateTime || value instanceof java.time.Instant) {
415
+            return "TIMESTAMP";
416
+        }
417
+        return "VARCHAR";
418
+    }
419
+
420
+    /**
421
+     * 根据 TDengine 列类型获取创建列的 SQL 类型
422
+     */
423
+    private String getColumnTypeForDDL(String tdType, String columnName) {
424
+        // 特殊列名使用较长长度
425
+        if ("device_id".equalsIgnoreCase(columnName)) {
426
+            return "VARCHAR(100)";
427
+        }
428
+        switch (tdType) {
429
+            case "BOOL":
430
+            case "BIGINT":
431
+            case "DOUBLE":
432
+            case "TIMESTAMP":
433
+                return tdType;
434
+            case "VARCHAR":
435
+            default:
436
+                return "VARCHAR(38)";
437
+        }
438
+    }
439
+
440
+    /**
441
+     * 格式化值
442
+     */
443
+    private String formatValue(Object value, String columnName, String columnType) {
444
+        if (value == null) {
445
+            return "NULL";
446
+        }
447
+        String strValue = value.toString();
448
+
449
+        // 数字类型不做长度校验
450
+        if ("DOUBLE".equals(columnType) || "BIGINT".equals(columnType) || "BOOL".equals(columnType)) {
451
+            if (value instanceof Boolean) {
452
+                return (Boolean) value ? "1" : "0";
453
+            }
454
+            return strValue;
455
+        }
456
+
457
+        // TIMESTAMP 类型处理
458
+        if ("TIMESTAMP".equals(columnType)) {
459
+            if (value instanceof java.util.Date) {
460
+                return String.valueOf(((java.util.Date) value).getTime());
461
+            }
462
+            if (value instanceof java.sql.Timestamp) {
463
+                return String.valueOf(((java.sql.Timestamp) value).getTime());
464
+            }
465
+            if (value instanceof java.time.LocalDateTime) {
466
+                return String.valueOf(((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.of("+8")).toEpochMilli());
467
+            }
468
+            if (value instanceof java.time.Instant) {
469
+                return String.valueOf(((java.time.Instant) value).toEpochMilli());
470
+            }
471
+        }
472
+
473
+        // 字符串类型校验长度
474
+        int maxLen = "device_id".equalsIgnoreCase(columnName) ? 100 : 38;
475
+        if (strValue.length() > maxLen) {
476
+            log.warn("字段值超长,跳过存储 | 列: {} | 值长度: {} | 最大: {} | 前{}字符: {}",
477
+                    columnName, strValue.length(), maxLen, maxLen, strValue.substring(0, maxLen));
478
+            return "NULL";
479
+        }
480
+        return "'" + escapeValue(strValue) + "'";
481
+    }
482
+
483
+    /**
484
+     * 确保列存在,如有新列则 ALTER 添加(根据类型添加对应列)
485
+     */
486
+    private void ensureColumnsExist(String dbName, String supertablename, Map<String, String> columnTypes) throws SQLException {
487
+        Set<String> existingColumns = getStableColumns(dbName, supertablename);
488
+
489
+        // 检查是否已达到列数上限
490
+        if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
491
+            log.error("超级表列数已达上限({}),停止添加新列 | 表: {}.{}", MAX_COLUMNS_PER_STABLE, dbName, supertablename);
492
+            return;
493
+        }
494
+
495
+        Connection conn = null;
496
+        Statement stmt = null;
497
+        try {
498
+            conn = getConnection();
499
+            stmt = conn.createStatement();
500
+            stmt.setQueryTimeout(10);
501
+
502
+            for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
503
+                String col = entry.getKey();
504
+                String colType = entry.getValue();
505
+
506
+                // 再次检查列数上限
507
+                if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
508
+                    log.error("超级表列数已达上限({}),跳过添加列 | 列: {} | 表: {}.{}",
509
+                            MAX_COLUMNS_PER_STABLE, col, dbName, supertablename);
510
+                    continue;
511
+                }
512
+
513
+                if (!existingColumns.contains(col)) {
514
+                    String ddlType = getColumnTypeForDDL(colType, col);
515
+                    String alterSql = String.format(
516
+                            "ALTER STABLE %s.%s ADD COLUMN %s %s",
517
+                            wrapName(dbName),
518
+                            wrapName(supertablename),
519
+                            wrapName(col),
520
+                            ddlType
521
+                    );
522
+                    try {
523
+                        stmt.executeUpdate(alterSql);
524
+                        log.info("超级表添加列: {}.{} -> {} ({})", dbName, supertablename, col, ddlType);
525
+                        existingColumns.add(col);
526
+                    } catch (SQLException e) {
527
+                        if (e.getMessage().contains("already exists") || e.getMessage().contains("duplicate")) {
528
+                            existingColumns.add(col);
529
+                        } else if (e.getMessage().contains("too many columns")) {
530
+                            log.error("超级表列数超限,无法添加列 | 列: {} | 表: {}.{} | 错误: {}",
531
+                                    col, dbName, supertablename, e.getMessage());
532
+                        } else {
533
+                            throw e;
534
+                        }
535
+                    }
536
+
537
+                    // 更新缓存
538
+                    String key = getStableKey(dbName, supertablename);
539
+                    stableColumnCache.put(key, existingColumns);
540
+                }
541
+            }
542
+        } finally {
543
+            if (stmt != null) try { stmt.close(); } catch (SQLException ignored) {}
544
+            closeConnection(conn);
545
+        }
546
+    }
547
+
548
+    /**
549
+     * 确保表存在
338 550
      */
339 551
     private void ensureTableExists(String dbName, String supertablename, String table) throws SQLException {
340 552
         Connection conn = null;
@@ -345,7 +557,7 @@ public class TDengineService {
345 557
             stmt = conn.createStatement();
346 558
             stmt.setQueryTimeout(5);
347 559
 
348
-            // 检查超级表是否存在(使用 information_schema)
560
+            // 检查超级表是否存在
349 561
             String checkStableSql = String.format(
350 562
                     "SELECT * FROM information_schema.ins_tables WHERE table_name = '%s' AND db_name = '%s'",
351 563
                     escapeValue(supertablename), escapeValue(dbName));
@@ -390,47 +602,6 @@ public class TDengineService {
390 602
         }
391 603
     }
392 604
 
393
-    /**
394
-     * 将动态字段构建为 JSON 字符串(排除 topic、ts、surfacename)
395
-     */
396
-    private String buildDynamicJson(Map<String, Object> data) {
397
-        if (data == null) return "{}";
398
-        Map<String, Object> dynamic = new LinkedHashMap<>();
399
-        for (Map.Entry<String, Object> entry : data.entrySet()) {
400
-            String key = entry.getKey();
401
-            if ("topic".equalsIgnoreCase(key) || "ts".equalsIgnoreCase(key)
402
-                    || "surfacename".equalsIgnoreCase(key)) {
403
-                continue;
404
-            }
405
-            Object value = entry.getValue();
406
-            if (value != null && !value.toString().trim().isEmpty()) {
407
-                dynamic.put(key, value);
408
-            }
409
-        }
410
-        try {
411
-            return objectMapper.writeValueAsString(dynamic);
412
-        } catch (Exception e) {
413
-            throw new IllegalStateException("JSON 序列化失败,数据可能丢失: " + dynamic, e);
414
-        }
415
-    }
416
-
417
-    /**
418
-     * GZIP 压缩字符串
419
-     */
420
-    private String compressToBase64(String data) {
421
-        if (data == null || data.isEmpty()) return "";
422
-        try {
423
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
424
-            try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
425
-                gzip.write(data.getBytes("UTF-8"));
426
-            }
427
-            byte[] compressed = Base64.getEncoder().encode(bos.toByteArray());
428
-            return new String(compressed);
429
-        } catch (Exception e) {
430
-            throw new IllegalStateException("GZIP 压缩失败: " + e.getMessage(), e);
431
-        }
432
-    }
433
-
434 605
     @Deprecated
435 606
     public void addToBatch(String dbName, String table, Map<String, Object> dataMap) throws SQLException {
436 607
         List<Map<String, Object>> list = new ArrayList<>();
@@ -438,9 +609,9 @@ public class TDengineService {
438 609
         insertBatch(dbName, table, list);
439 610
     }
440 611
 
441
-    // ==========================================
442
-    // 可选:清除缓存(用于运维或测试)
443
-    // ==========================================
612
+    /**
613
+     * 清除缓存
614
+     */
444 615
     public void clearStableColumnCache() {
445 616
         stableColumnCache.clear();
446 617
         log.info("清除了 TDengine 超级表结构缓存");
@@ -448,7 +619,6 @@ public class TDengineService {
448 619
 
449 620
     public void close() {
450 621
         log.info("关闭 TDengine 服务...");
451
-        // batchExecutor 由 Spring 管理生命周期,不在这里 shutdown
452 622
         if (dataSource != null) {
453 623
             dataSource.close();
454 624
         }

+ 195
- 45
iot-platform/src/test/java/com/iot/platform/service/TDengineServiceTest.java Прегледај датотеку

@@ -83,69 +83,225 @@ class TDengineServiceTest {
83 83
     }
84 84
 
85 85
     @Test
86
-    @DisplayName("buildDynamicJson: 应排除 topic/ts/surfacename")
87
-    void buildDynamicJson_excludesReservedKeys() throws Exception {
88
-        Method method = TDengineService.class.getDeclaredMethod("buildDynamicJson", Map.class);
86
+    @DisplayName("getValueType: Boolean 返回 BOOL")
87
+    void getValueType_boolean_returnsBool() throws Exception {
88
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
89 89
         method.setAccessible(true);
90 90
 
91
-        Map<String, Object> data = new LinkedHashMap<>();
92
-        data.put("topic", "test/topic");
93
-        data.put("ts", "1234567890");
94
-        data.put("surfacename", "s1");
95
-        data.put("temperature", "25.5");
96
-        data.put("humidity", "60");
91
+        assertThat(method.invoke(tdengineService, Boolean.TRUE)).isEqualTo("BOOL");
92
+        assertThat(method.invoke(tdengineService, Boolean.FALSE)).isEqualTo("BOOL");
93
+    }
94
+
95
+    @Test
96
+    @DisplayName("getValueType: Integer 返回 BIGINT")
97
+    void getValueType_integer_returnsBigint() throws Exception {
98
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
99
+        method.setAccessible(true);
97 100
 
98
-        String result = (String) method.invoke(tdengineService, data);
99
-        assertThat(result).contains("temperature", "humidity");
100
-        assertThat(result).doesNotContain("topic", "ts", "surfacename");
101
+        assertThat(method.invoke(tdengineService, 123)).isEqualTo("BIGINT");
102
+        assertThat(method.invoke(tdengineService, Integer.valueOf(456))).isEqualTo("BIGINT");
101 103
     }
102 104
 
103 105
     @Test
104
-    @DisplayName("buildDynamicJson: 空值和空字符串应被过滤")
105
-    void buildDynamicJson_filtersEmptyValues() throws Exception {
106
-        Method method = TDengineService.class.getDeclaredMethod("buildDynamicJson", Map.class);
106
+    @DisplayName("getValueType: Long 返回 BIGINT")
107
+    void getValueType_long_returnsBigint() throws Exception {
108
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
107 109
         method.setAccessible(true);
108 110
 
109
-        Map<String, Object> data = new LinkedHashMap<>();
110
-        data.put("valid", "value");
111
-        data.put("empty", "");
112
-        data.put("nullVal", null);
111
+        assertThat(method.invoke(tdengineService, 123L)).isEqualTo("BIGINT");
112
+    }
113
+
114
+    @Test
115
+    @DisplayName("getValueType: Float 返回 DOUBLE")
116
+    void getValueType_float_returnsDouble() throws Exception {
117
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
118
+        method.setAccessible(true);
119
+
120
+        assertThat(method.invoke(tdengineService, 12.34f)).isEqualTo("DOUBLE");
121
+    }
122
+
123
+    @Test
124
+    @DisplayName("getValueType: Double 返回 DOUBLE")
125
+    void getValueType_double_returnsDouble() throws Exception {
126
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
127
+        method.setAccessible(true);
113 128
 
114
-        String result = (String) method.invoke(tdengineService, data);
115
-        assertThat(result).contains("valid");
116
-        assertThat(result).doesNotContain("empty", "nullVal");
129
+        assertThat(method.invoke(tdengineService, 56.78)).isEqualTo("DOUBLE");
130
+        assertThat(method.invoke(tdengineService, Double.valueOf(99.99))).isEqualTo("DOUBLE");
117 131
     }
118 132
 
119 133
     @Test
120
-    @DisplayName("buildDynamicJson: null 输入应返回空 JSON")
121
-    void buildDynamicJson_nullInput_returnsEmptyJson() throws Exception {
122
-        Method method = TDengineService.class.getDeclaredMethod("buildDynamicJson", Map.class);
134
+    @DisplayName("getValueType: java.util.Date 返回 TIMESTAMP")
135
+    void getValueType_date_returnsTimestamp() throws Exception {
136
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
123 137
         method.setAccessible(true);
124 138
 
125
-        assertThat(method.invoke(tdengineService, (Map<String, Object>) null)).isEqualTo("{}");
139
+        assertThat(method.invoke(tdengineService, new java.util.Date())).isEqualTo("TIMESTAMP");
126 140
     }
127 141
 
128 142
     @Test
129
-    @DisplayName("compressToBase64: 应压缩并返回非空字符串")
130
-    void compressToBase64_compressesData() throws Exception {
131
-        Method method = TDengineService.class.getDeclaredMethod("compressToBase64", String.class);
143
+    @DisplayName("getValueType: java.sql.Timestamp 返回 TIMESTAMP")
144
+    void getValueType_timestamp_returnsTimestamp() throws Exception {
145
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
132 146
         method.setAccessible(true);
133 147
 
134
-        String input = "{\"temperature\":25.5,\"humidity\":60}";
135
-        String result = (String) method.invoke(tdengineService, input);
148
+        assertThat(method.invoke(tdengineService, new java.sql.Timestamp(System.currentTimeMillis()))).isEqualTo("TIMESTAMP");
149
+    }
150
+
151
+    @Test
152
+    @DisplayName("getValueType: java.time.LocalDateTime 返回 TIMESTAMP")
153
+    void getValueType_localDateTime_returnsTimestamp() throws Exception {
154
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
155
+        method.setAccessible(true);
136 156
 
137
-        assertThat(result).isNotEmpty();
138
-        assertThat(result).isNotEqualTo(input); // should be compressed
157
+        assertThat(method.invoke(tdengineService, java.time.LocalDateTime.now())).isEqualTo("TIMESTAMP");
139 158
     }
140 159
 
141 160
     @Test
142
-    @DisplayName("compressToBase64: null 或空字符串应返回空")
143
-    void compressToBase64_nullOrEmpty_returnsEmpty() throws Exception {
144
-        Method method = TDengineService.class.getDeclaredMethod("compressToBase64", String.class);
161
+    @DisplayName("getValueType: java.time.Instant 返回 TIMESTAMP")
162
+    void getValueType_instant_returnsTimestamp() throws Exception {
163
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
145 164
         method.setAccessible(true);
146 165
 
147
-        assertThat(method.invoke(tdengineService, (String) null)).isEqualTo("");
148
-        assertThat(method.invoke(tdengineService, "")).isEqualTo("");
166
+        assertThat(method.invoke(tdengineService, java.time.Instant.now())).isEqualTo("TIMESTAMP");
167
+    }
168
+
169
+    @Test
170
+    @DisplayName("getValueType: String 返回 VARCHAR")
171
+    void getValueType_string_returnsVarchar() throws Exception {
172
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
173
+        method.setAccessible(true);
174
+
175
+        assertThat(method.invoke(tdengineService, "test")).isEqualTo("VARCHAR");
176
+        assertThat(method.invoke(tdengineService, "")).isEqualTo("VARCHAR");
177
+    }
178
+
179
+    @Test
180
+    @DisplayName("getValueType: null 返回 VARCHAR")
181
+    void getValueType_null_returnsVarchar() throws Exception {
182
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
183
+        method.setAccessible(true);
184
+
185
+        assertThat(method.invoke(tdengineService, null)).isEqualTo("VARCHAR");
186
+    }
187
+
188
+    @Test
189
+    @DisplayName("getColumnTypeForDDL: BOOL/BIGINT/DOUBLE/TIMESTAMP 直接返回")
190
+    void getColumnTypeForDDL_directTypes() throws Exception {
191
+        Method method = TDengineService.class.getDeclaredMethod("getColumnTypeForDDL", String.class);
192
+        method.setAccessible(true);
193
+
194
+        assertThat(method.invoke(tdengineService, "BOOL")).isEqualTo("BOOL");
195
+        assertThat(method.invoke(tdengineService, "BIGINT")).isEqualTo("BIGINT");
196
+        assertThat(method.invoke(tdengineService, "DOUBLE")).isEqualTo("DOUBLE");
197
+        assertThat(method.invoke(tdengineService, "TIMESTAMP")).isEqualTo("TIMESTAMP");
198
+    }
199
+
200
+    @Test
201
+    @DisplayName("getColumnTypeForDDL: VARCHAR 返回 VARCHAR(32)")
202
+    void getColumnTypeForDDL_varchar_returnsWithLength() throws Exception {
203
+        Method method = TDengineService.class.getDeclaredMethod("getColumnTypeForDDL", String.class);
204
+        method.setAccessible(true);
205
+
206
+        assertThat(method.invoke(tdengineService, "VARCHAR")).isEqualTo("VARCHAR(32)");
207
+    }
208
+
209
+    @Test
210
+    @DisplayName("formatValue: null 返回 NULL")
211
+    void formatValue_null_returnsNull() throws Exception {
212
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
213
+        method.setAccessible(true);
214
+
215
+        assertThat(method.invoke(tdengineService, null, "col", "VARCHAR")).isEqualTo("NULL");
216
+    }
217
+
218
+    @Test
219
+    @DisplayName("formatValue: DOUBLE 类型直接返回数字字符串")
220
+    void formatValue_double_returnsNumberString() throws Exception {
221
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
222
+        method.setAccessible(true);
223
+
224
+        assertThat(method.invoke(tdengineService, 12.34, "col", "DOUBLE")).isEqualTo("12.34");
225
+    }
226
+
227
+    @Test
228
+    @DisplayName("formatValue: BIGINT 类型直接返回数字字符串")
229
+    void formatValue_bigint_returnsNumberString() throws Exception {
230
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
231
+        method.setAccessible(true);
232
+
233
+        assertThat(method.invoke(tdengineService, 123L, "col", "BIGINT")).isEqualTo("123");
234
+    }
235
+
236
+    @Test
237
+    @DisplayName("formatValue: BOOL 类型 true 返回 1")
238
+    void formatValue_bool_true_returns1() throws Exception {
239
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
240
+        method.setAccessible(true);
241
+
242
+        assertThat(method.invoke(tdengineService, true, "col", "BOOL")).isEqualTo("1");
243
+        assertThat(method.invoke(tdengineService, false, "col", "BOOL")).isEqualTo("0");
244
+    }
245
+
246
+    @Test
247
+    @DisplayName("formatValue: VARCHAR 类型超过32字符返回 NULL")
248
+    void formatValue_varchar_exceedsLength_returnsNull() throws Exception {
249
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
250
+        method.setAccessible(true);
251
+
252
+        String longStr = "12345678901234567890123456789012345"; // 35 chars
253
+        assertThat(method.invoke(tdengineService, longStr, "col", "VARCHAR")).isEqualTo("NULL");
254
+    }
255
+
256
+    @Test
257
+    @DisplayName("formatValue: VARCHAR 类型32字符内正常返回")
258
+    void formatValue_varchar_withinLength_returnsQuoted() throws Exception {
259
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
260
+        method.setAccessible(true);
261
+
262
+        assertThat(method.invoke(tdengineService, "test", "col", "VARCHAR")).isEqualTo("'test'");
263
+    }
264
+
265
+    @Test
266
+    @DisplayName("formatValue: VARCHAR 类型包含单引号应转义")
267
+    void formatValue_varchar_withQuote_escapes() throws Exception {
268
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
269
+        method.setAccessible(true);
270
+
271
+        assertThat(method.invoke(tdengineService, "it's", "col", "VARCHAR")).isEqualTo("'it''s'");
272
+    }
273
+
274
+    @Test
275
+    @DisplayName("formatValue: TIMESTAMP 类型 java.util.Date 转为毫秒")
276
+    void formatValue_timestamp_date_returnsMillis() throws Exception {
277
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
278
+        method.setAccessible(true);
279
+
280
+        java.util.Date date = new java.util.Date(1234567890000L);
281
+        String result = (String) method.invoke(tdengineService, date, "col", "TIMESTAMP");
282
+        assertThat(result).isEqualTo("1234567890000");
283
+    }
284
+
285
+    @Test
286
+    @DisplayName("isReservedColumn: ts 和 surfacename 返回 true")
287
+    void isReservedColumn_reservedNames() throws Exception {
288
+        Method method = TDengineService.class.getDeclaredMethod("isReservedColumn", String.class);
289
+        method.setAccessible(true);
290
+
291
+        assertThat(method.invoke(tdengineService, "ts")).isEqualTo(true);
292
+        assertThat(method.invoke(tdengineService, "TS")).isEqualTo(true);
293
+        assertThat(method.invoke(tdengineService, "surfacename")).isEqualTo(true);
294
+        assertThat(method.invoke(tdengineService, "Surfacename")).isEqualTo(true);
295
+    }
296
+
297
+    @Test
298
+    @DisplayName("isReservedColumn: 其他列名返回 false")
299
+    void isReservedColumn_otherNames() throws Exception {
300
+        Method method = TDengineService.class.getDeclaredMethod("isReservedColumn", String.class);
301
+        method.setAccessible(true);
302
+
303
+        assertThat(method.invoke(tdengineService, "temperature")).isEqualTo(false);
304
+        assertThat(method.invoke(tdengineService, "voltage")).isEqualTo(false);
149 305
     }
150 306
 
151 307
     @Test
@@ -161,8 +317,6 @@ class TDengineServiceTest {
161 317
         Map<String, Object> data = new HashMap<>();
162 318
         data.put("key", "value");
163 319
 
164
-        // addToBatch 现在声明 throws SQLException,尝试连接 TDengine 会失败
165
-        // 本地环境缺少 TDengine 驱动时会抛出各种异常(UnsatisfiedLinkError / NoClassDefFoundError / IllegalStateException)
166 320
         assertThatThrownBy(() -> tdengineService.addToBatch("db", "table", data))
167 321
                 .isInstanceOf(Throwable.class);
168 322
     }
@@ -171,21 +325,18 @@ class TDengineServiceTest {
171 325
     @DisplayName("clearStableColumnCache: 应清空缓存不抛异常")
172 326
     void clearStableColumnCache_clearsWithoutError() {
173 327
         tdengineService.clearStableColumnCache();
174
-        // Should not throw
175 328
     }
176 329
 
177 330
     @Test
178 331
     @DisplayName("close: 应关闭数据源不抛异常")
179 332
     void close_closesDataSource() {
180 333
         tdengineService.close();
181
-        // Should not throw even if dataSource is null
182 334
     }
183 335
 
184 336
     @Test
185 337
     @DisplayName("closeConnection: null 连接应安全处理")
186 338
     void closeConnection_nullConnection_safe() {
187 339
         tdengineService.closeConnection(null);
188
-        // Should not throw
189 340
     }
190 341
 
191 342
     @Test
@@ -193,8 +344,7 @@ class TDengineServiceTest {
193 344
     void getConnection_initFails_throwsException() {
194 345
         when(tdengineConfig.getUrl()).thenReturn("jdbc:TAOS://invalid:6030/test");
195 346
 
196
-        // 本地环境缺少 TDengine 驱动时会抛出 NoClassDefFoundError / UnsatisfiedLinkError
197 347
         assertThatThrownBy(() -> tdengineService.getConnection())
198 348
                 .isInstanceOf(Throwable.class);
199 349
     }
200
-}
350
+}

Loading…
Откажи
Сачувај