Ver código fonte

Merge branch 'master' of http://114.215.146.132:3000/Mqy/Wisdom-Data into mqy20260511

 Conflicts:
	iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java
mqy20260511
lenovo 2 dias atrás
pai
commit
265a8fbc46

+ 21
- 8
deploy/deploy.sh Ver arquivo

@@ -19,6 +19,7 @@ REMOTE_DIR="${REMOTE_DIR:-/opt/iot-platform}"
19 19
 APP_NAME="iot-platform"
20 20
 LOCAL_JAR=""
21 21
 DO_BUILD=false
22
+NO_ROLLBACK=false
22 23
 
23 24
 # 颜色
24 25
 RED='\033[0;31m'
@@ -33,14 +34,19 @@ while [[ $# -gt 0 ]]; do
33 34
             DO_BUILD=true
34 35
             shift
35 36
             ;;
37
+        --no-rollback)
38
+            NO_ROLLBACK=true
39
+            shift
40
+            ;;
36 41
         --jar)
37 42
             LOCAL_JAR="$2"
38 43
             shift 2
39 44
             ;;
40 45
         --help|-h)
41
-            echo "用法: ./deploy.sh [--build] [--jar <path>]"
42
-            echo "  --build     本地执行 mvn clean package"
43
-            echo "  --jar       指定要部署的 jar 文件路径"
46
+            echo "用法: ./deploy.sh [--build] [--no-rollback] [--jar <path>]"
47
+            echo "  --build        本地执行 mvn clean package"
48
+            echo "  --no-rollback  健康检查失败时不自动回滚,保留新版本用于排查"
49
+            echo "  --jar          指定要部署的 jar 文件路径"
44 50
             exit 0
45 51
             ;;
46 52
         *)
@@ -127,7 +133,7 @@ echo -e "${GREEN}[5/6] 服务已启动${NC}"
127 133
 
128 134
 # 步骤 6: 健康检查
129 135
 echo -e "${YELLOW}[6/6] 健康检查...${NC}"
130
-if ssh "${SERVER_USER}@${SERVER_HOST}" "bash ${REMOTE_DIR}/bin/health-check.sh localhost 8887 30"; then
136
+if ssh "${SERVER_USER}@${SERVER_HOST}" "bash ${REMOTE_DIR}/bin/health-check.sh localhost 8887 60"; then
131 137
     echo -e "${GREEN}[6/6] 健康检查通过,部署成功!${NC}"
132 138
     echo ""
133 139
     echo "========================================"
@@ -142,10 +148,17 @@ if ssh "${SERVER_USER}@${SERVER_HOST}" "bash ${REMOTE_DIR}/bin/health-check.sh l
142 148
     ssh "${SERVER_USER}@${SERVER_HOST}" "systemctl status ${APP_NAME} --no-pager"
143 149
     exit 0
144 150
 else
145
-    echo -e "${RED}[6/6] 健康检查失败! 执行回滚...${NC}"
151
+    echo -e "${RED}[6/6] 健康检查失败!${NC}"
152
+
153
+    if [ "$NO_ROLLBACK" = true ]; then
154
+        echo -e "${YELLOW}[no-rollback] 已启用不回滚模式,保留新版本用于排查${NC}"
155
+        echo -e "${YELLOW}排查命令:${NC}"
156
+        echo "  ssh ${SERVER_USER}@${SERVER_HOST} 'journalctl -u ${APP_NAME} -f'"
157
+        echo "  ssh ${SERVER_USER}@${SERVER_HOST} 'systemctl status ${APP_NAME}'"
158
+        exit 1
159
+    fi
146 160
 
147
-    # 回滚
148
-    echo -e "${YELLOW}[rollback] 停止服务...${NC}"
161
+    echo -e "${YELLOW}[rollback] 执行回滚...${NC}"
149 162
     ssh "${SERVER_USER}@${SERVER_HOST}" "systemctl stop ${APP_NAME} || true"
150 163
 
151 164
     if ssh "${SERVER_USER}@${SERVER_HOST}" "test -f ${REMOTE_DIR}/backup/${BACKUP_NAME}"; then
@@ -156,7 +169,7 @@ else
156 169
         "
157 170
         sleep 3
158 171
 
159
-        if ssh "${SERVER_USER}@${SERVER_HOST}" "bash ${REMOTE_DIR}/bin/health-check.sh localhost 8887 30"; then
172
+        if ssh "${SERVER_USER}@${SERVER_HOST}" "bash ${REMOTE_DIR}/bin/health-check.sh localhost 8887 60"; then
160 173
             echo -e "${GREEN}[rollback] 回滚成功,旧版本已恢复${NC}"
161 174
         else
162 175
             echo -e "${RED}[rollback] 回滚后健康检查仍失败,请手动排查!${NC}"

+ 3
- 3
iot-platform/pom.xml Ver arquivo

@@ -3,9 +3,9 @@
3 3
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 4
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 5
     <parent>
6
-        <artifactId>ruoyi</artifactId>
7
-        <groupId>com.ruoyi</groupId>
8
-        <version>3.9.0</version>
6
+        <groupId>com.iot.platform</groupId>
7
+        <artifactId>wisdom-data</artifactId>
8
+        <version>1.0.0</version>
9 9
     </parent>
10 10
     <modelVersion>4.0.0</modelVersion>
11 11
     <packaging>jar</packaging>

+ 264
- 94
iot-platform/src/main/java/com/iot/platform/service/TDengineService.java Ver arquivo

@@ -9,15 +9,11 @@ import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.beans.factory.annotation.Qualifier;
10 10
 import org.springframework.stereotype.Service;
11 11
 
12
-import java.io.ByteArrayOutputStream;
13 12
 import java.sql.*;
14 13
 import java.util.*;
15 14
 import java.util.concurrent.*;
16
-import java.util.zip.*;
17 15
 
18
-import com.fasterxml.jackson.databind.ObjectMapper;
19
-
20
-@Service
16
+@Service("tdengineService")
21 17
 public class TDengineService {
22 18
     private static final Logger log = LoggerFactory.getLogger(TDengineService.class);
23 19
 
@@ -34,15 +30,15 @@ public class TDengineService {
34 30
     private HikariDataSource dataSource;
35 31
     private volatile boolean dataSourceInitialized = false;
36 32
 
37
-    // === 新增:缓存超级表结构 (key = dbName.stableName) ===
33
+    // === 缓存超级表结构 (key = dbName.stableName) ===
38 34
     private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
39 35
     private static final int MAX_CACHE_SIZE = 1000;
40 36
 
41
-    // JSON 列名,用于存储所有动态字段
42
-    private static final String JSON_COLUMN_NAME = "ext_data";
37
+    // 允许的列名字符
38
+    private static final String ALLOWED_COLUMNS = "^[a-zA-Z_][a-zA-Z0-9_]*$";
43 39
 
44
-    // ObjectMapper 线程安全,可复用
45
-    private static final ObjectMapper objectMapper = new ObjectMapper();
40
+    // TDengine 超级表最大列数限制
41
+    private static final int MAX_COLUMNS_PER_STABLE = 4096;
46 42
 
47 43
     private synchronized void initDataSource() {
48 44
         if (dataSourceInitialized) {
@@ -72,7 +68,7 @@ public class TDengineService {
72 68
     }
73 69
 
74 70
     /**
75
-     * 手动获取连接
71
+     * 获取连接
76 72
      */
77 73
     public Connection getConnection() throws SQLException {
78 74
         if (!dataSourceInitialized) {
@@ -86,7 +82,7 @@ public class TDengineService {
86 82
     }
87 83
 
88 84
     /**
89
-     * 手动关闭连接
85
+     * 关闭连接
90 86
      */
91 87
     public void closeConnection(Connection conn) {
92 88
         if (conn != null) {
@@ -111,7 +107,7 @@ public class TDengineService {
111 107
     }
112 108
 
113 109
     private boolean isValidFieldName(String name) {
114
-        return name != null && name.matches("^[a-zA-Z0-9_]+$");
110
+        return name != null && name.matches(ALLOWED_COLUMNS);
115 111
     }
116 112
 
117 113
     // === 缓存工具方法 ===
@@ -129,15 +125,12 @@ public class TDengineService {
129 125
             return cached;
130 126
         }
131 127
 
132
-        // 缓存未命中,查 DB
133 128
         Set<String> columns = loadStableColumnsFromDB(dbName, stableName);
134
-        if (!columns.isEmpty()) {
135
-            if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
136
-                stableColumnCache.clear();
137
-                log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
138
-            }
139
-            stableColumnCache.put(key, columns);
129
+        if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
130
+            stableColumnCache.clear();
131
+            log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
140 132
         }
133
+        stableColumnCache.put(key, columns);
141 134
         return columns;
142 135
     }
143 136
 
@@ -151,7 +144,7 @@ public class TDengineService {
151 144
         try {
152 145
             conn = getConnection();
153 146
             stmt = conn.createStatement();
154
-            stmt.setQueryTimeout(5); // ⭐ 防止无限等待
147
+            stmt.setQueryTimeout(5);
155 148
             rs = stmt.executeQuery(sql);
156 149
             while (rs.next()) {
157 150
                 columns.add(rs.getString(1));
@@ -169,7 +162,7 @@ public class TDengineService {
169 162
     }
170 163
 
171 164
     // ==========================================
172
-    // 初始化表结构
165
+    // 初始化表结构(按列存储,无 ext_data)
173 166
     // ==========================================
174 167
     private void initTableStructure(String dbName, String supertablename, String table, Set<String> fieldNames) throws SQLException {
175 168
         Connection conn = null;
@@ -181,10 +174,9 @@ public class TDengineService {
181 174
 
182 175
             stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName));
183 176
 
184
-            // 创建超级表:固定 ts + surfacename + ext_data(VARCHAR)
185
-            // 注:TDengine 2.x JSON 类型只能用于 TAGS,不能用于普通列
177
+            // 创建超级表:固定 ts + surfacename,无 ext_data 列
186 178
             String stableSql = String.format(
187
-                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64), ext_data VARCHAR(8192)) TAGS (location BINARY(64))",
179
+                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location BINARY(64))",
188 180
                     wrapName(dbName),
189 181
                     wrapName(supertablename)
190 182
             );
@@ -192,16 +184,12 @@ public class TDengineService {
192 184
 
193 185
             // 更新缓存:固定列
194 186
             String key = getStableKey(dbName, supertablename);
195
-            if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
196
-                stableColumnCache.clear();
197
-                log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
198
-            }
199 187
             Set<String> fixedCols = new HashSet<>();
200 188
             fixedCols.add("ts");
201 189
             fixedCols.add("surfacename");
202
-            fixedCols.add("ext_data");
203 190
             stableColumnCache.put(key, fixedCols);
204 191
 
192
+            // 创建子表
205 193
             String tableSql = String.format(
206 194
                     "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
207 195
                     wrapName(dbName),
@@ -219,17 +207,17 @@ public class TDengineService {
219 207
     }
220 208
 
221 209
     // ==========================================
222
-    // 批量插入
210
+    // 批量插入(按列存储)
223 211
     // ==========================================
224 212
     public void insertBatch(String dbName, String table, List<Map<String, Object>> dataList) throws SQLException {
225 213
         if (dataList == null || dataList.isEmpty()) return;
226 214
 
227 215
         String supertablename = table.contains("_") ? table.substring(0, table.lastIndexOf('_')) : table;
228 216
 
229
-        // 初始化表结构(固定为 ts + surfacename + ext_data)
217
+        // 初始化表结构
230 218
         initTableStructure(dbName, supertablename, table, Collections.emptySet());
231 219
 
232
-        // 分批处理,每批最多 50 条记录(避免 SQL 过长)
220
+        // 分批处理,每批最多 50 条记录
233 221
         int batchSize = 50;
234 222
         int totalBatches = (dataList.size() + batchSize - 1) / batchSize;
235 223
 
@@ -248,23 +236,65 @@ public class TDengineService {
248 236
      * 内部方法:插入一批数据
249 237
      */
250 238
     private void insertBatchInternal(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) throws SQLException {
251
-        // 确保表存在(可能有竞态条件)
252 239
         ensureTableExists(dbName, supertablename, table);
253 240
 
241
+        // 收集所有动态列名及其数据类型
242
+        Map<String, String> columnTypes = new LinkedHashMap<>();
243
+        for (Map<String, Object> data : dataList) {
244
+            if (data == null) continue;
245
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
246
+                String key = entry.getKey();
247
+                if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
248
+                    columnTypes.put(key, getValueType(entry.getValue()));
249
+                }
250
+            }
251
+        }
252
+
253
+        // 获取当前列数
254
+        Set<String> existingColumns = getStableColumns(dbName, supertablename);
255
+        int totalColumns = existingColumns.size() + columnTypes.size();
256
+
257
+        // 检查列数是否超限
258
+        if (totalColumns > MAX_COLUMNS_PER_STABLE) {
259
+            log.error("超级表总列数({})超过限制({}),跳过本次插入 | 表: {}.{}",
260
+                    totalColumns, MAX_COLUMNS_PER_STABLE, dbName, table);
261
+            return;
262
+        }
263
+
264
+        // 确保所有列都存在(自动 ALTER 添加新列)
265
+        ensureColumnsExist(dbName, supertablename, columnTypes);
266
+
267
+        // 重新获取列数以确保一致性
268
+        existingColumns = getStableColumns(dbName, supertablename);
269
+        if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
270
+            log.error("超级表列数已达上限({}),停止插入数据 | 表: {}.{}",
271
+                    MAX_COLUMNS_PER_STABLE, dbName, table);
272
+            return;
273
+        }
274
+
275
+        // 构建 SQL
254 276
         StringBuilder sqlBuilder = new StringBuilder();
255 277
         sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
256
-                .append(" (ts, surfacename, ext_data) VALUES ");
278
+                .append(" (ts, surfacename");
279
+
280
+        for (String col : columnTypes.keySet()) {
281
+            sqlBuilder.append(", ").append(wrapName(col));
282
+        }
283
+        sqlBuilder.append(") VALUES ");
257 284
 
258 285
         boolean hasData = false;
259 286
         for (Map<String, Object> data : dataList) {
260 287
             if (data == null) continue;
261 288
 
262
-            // 将动态字段转为 JSON 并 GZIP 压缩
263
-            String extJson = buildDynamicJson(data);
264
-            String compressed = compressToBase64(extJson);
289
+            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("'");
265 290
 
266
-            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
267
-                    .append(escapeValue(compressed)).append("'),");
291
+            for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
292
+                String col = entry.getKey();
293
+                String colType = entry.getValue();
294
+                Object value = data.get(col);
295
+                sqlBuilder.append(", ").append(formatValue(value, col, colType));
296
+            }
297
+            sqlBuilder.append("),");
268 298
             hasData = true;
269 299
         }
270 300
 
@@ -281,7 +311,6 @@ public class TDengineService {
281 311
             stmt.setQueryTimeout(30);
282 312
             stmt.executeUpdate(finalSql);
283 313
         } catch (SQLException e) {
284
-            // 表不存在时尝试重建表后重试
285 314
             if (e.getMessage().contains("Table does not exist")) {
286 315
                 log.warn("表不存在,重建表: {}", table);
287 316
                 initTableStructure(dbName, supertablename, table, Collections.emptySet());
@@ -296,20 +325,44 @@ public class TDengineService {
296 325
     }
297 326
 
298 327
     /**
299
-     * 重试插入(表重建后)
328
+     * 重试插入
300 329
      */
301 330
     private void insertBatchRetry(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) throws SQLException {
331
+        Map<String, String> columnTypes = new LinkedHashMap<>();
332
+        for (Map<String, Object> data : dataList) {
333
+            if (data == null) continue;
334
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
335
+                String key = entry.getKey();
336
+                if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
337
+                    columnTypes.put(key, getValueType(entry.getValue()));
338
+                }
339
+            }
340
+        }
341
+
342
+        ensureColumnsExist(dbName, supertablename, columnTypes);
343
+
302 344
         StringBuilder sqlBuilder = new StringBuilder();
303 345
         sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
304
-                .append(" (ts, surfacename, ext_data) VALUES ");
346
+                .append(" (ts, surfacename");
347
+
348
+        for (String col : columnTypes.keySet()) {
349
+            sqlBuilder.append(", ").append(wrapName(col));
350
+        }
351
+        sqlBuilder.append(") VALUES ");
305 352
 
306 353
         boolean hasData = false;
307 354
         for (Map<String, Object> data : dataList) {
308 355
             if (data == null) continue;
309
-            String extJson = buildDynamicJson(data);
310
-            String compressed = compressToBase64(extJson);
311
-            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
312
-                    .append(escapeValue(compressed)).append("'),");
356
+
357
+            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("'");
358
+
359
+            for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
360
+                String col = entry.getKey();
361
+                String colType = entry.getValue();
362
+                Object value = data.get(col);
363
+                sqlBuilder.append(", ").append(formatValue(value, col, colType));
364
+            }
365
+            sqlBuilder.append("),");
313 366
             hasData = true;
314 367
         }
315 368
 
@@ -334,7 +387,166 @@ public class TDengineService {
334 387
     }
335 388
 
336 389
     /**
337
-     * 确保表存在(检查+创建)
390
+     * 判断是否为保留列(ts, surfacename)
391
+     */
392
+    private boolean isReservedColumn(String columnName) {
393
+        return "ts".equalsIgnoreCase(columnName) || "surfacename".equalsIgnoreCase(columnName);
394
+    }
395
+
396
+    /**
397
+     * 判断值类型,返回 TDengine 对应的数据类型
398
+     */
399
+    private String getValueType(Object value) {
400
+        if (value == null) {
401
+            return "VARCHAR";
402
+        }
403
+        if (value instanceof Boolean) {
404
+            return "BOOL";
405
+        }
406
+        if (value instanceof Integer || value instanceof Long) {
407
+            return "BIGINT";
408
+        }
409
+        if (value instanceof Float || value instanceof Double) {
410
+            return "DOUBLE";
411
+        }
412
+        // 时间类型(Date, Timestamp, LocalDateTime 等)
413
+        if (value instanceof java.util.Date || value instanceof java.sql.Timestamp
414
+                || value instanceof java.time.LocalDateTime || value instanceof java.time.Instant) {
415
+            return "TIMESTAMP";
416
+        }
417
+        return "VARCHAR";
418
+    }
419
+
420
+    /**
421
+     * 根据 TDengine 列类型获取创建列的 SQL 类型
422
+     */
423
+    private String getColumnTypeForDDL(String tdType, String columnName) {
424
+        // 特殊列名使用较长长度
425
+        if ("device_id".equalsIgnoreCase(columnName)) {
426
+            return "VARCHAR(100)";
427
+        }
428
+        switch (tdType) {
429
+            case "BOOL":
430
+            case "BIGINT":
431
+            case "DOUBLE":
432
+            case "TIMESTAMP":
433
+                return tdType;
434
+            case "VARCHAR":
435
+            default:
436
+                return "VARCHAR(38)";
437
+        }
438
+    }
439
+
440
+    /**
441
+     * 格式化值
442
+     */
443
+    private String formatValue(Object value, String columnName, String columnType) {
444
+        if (value == null) {
445
+            return "NULL";
446
+        }
447
+        String strValue = value.toString();
448
+
449
+        // 数字类型不做长度校验
450
+        if ("DOUBLE".equals(columnType) || "BIGINT".equals(columnType) || "BOOL".equals(columnType)) {
451
+            if (value instanceof Boolean) {
452
+                return (Boolean) value ? "1" : "0";
453
+            }
454
+            return strValue;
455
+        }
456
+
457
+        // TIMESTAMP 类型处理
458
+        if ("TIMESTAMP".equals(columnType)) {
459
+            if (value instanceof java.util.Date) {
460
+                return String.valueOf(((java.util.Date) value).getTime());
461
+            }
462
+            if (value instanceof java.sql.Timestamp) {
463
+                return String.valueOf(((java.sql.Timestamp) value).getTime());
464
+            }
465
+            if (value instanceof java.time.LocalDateTime) {
466
+                return String.valueOf(((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.of("+8")).toEpochMilli());
467
+            }
468
+            if (value instanceof java.time.Instant) {
469
+                return String.valueOf(((java.time.Instant) value).toEpochMilli());
470
+            }
471
+        }
472
+
473
+        // 字符串类型校验长度
474
+        int maxLen = "device_id".equalsIgnoreCase(columnName) ? 100 : 38;
475
+        if (strValue.length() > maxLen) {
476
+            log.warn("字段值超长,跳过存储 | 列: {} | 值长度: {} | 最大: {} | 前{}字符: {}",
477
+                    columnName, strValue.length(), maxLen, maxLen, strValue.substring(0, maxLen));
478
+            return "NULL";
479
+        }
480
+        return "'" + escapeValue(strValue) + "'";
481
+    }
482
+
483
+    /**
484
+     * 确保列存在,如有新列则 ALTER 添加(根据类型添加对应列)
485
+     */
486
+    private void ensureColumnsExist(String dbName, String supertablename, Map<String, String> columnTypes) throws SQLException {
487
+        Set<String> existingColumns = getStableColumns(dbName, supertablename);
488
+
489
+        // 检查是否已达到列数上限
490
+        if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
491
+            log.error("超级表列数已达上限({}),停止添加新列 | 表: {}.{}", MAX_COLUMNS_PER_STABLE, dbName, supertablename);
492
+            return;
493
+        }
494
+
495
+        Connection conn = null;
496
+        Statement stmt = null;
497
+        try {
498
+            conn = getConnection();
499
+            stmt = conn.createStatement();
500
+            stmt.setQueryTimeout(10);
501
+
502
+            for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
503
+                String col = entry.getKey();
504
+                String colType = entry.getValue();
505
+
506
+                // 再次检查列数上限
507
+                if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
508
+                    log.error("超级表列数已达上限({}),跳过添加列 | 列: {} | 表: {}.{}",
509
+                            MAX_COLUMNS_PER_STABLE, col, dbName, supertablename);
510
+                    continue;
511
+                }
512
+
513
+                if (!existingColumns.contains(col)) {
514
+                    String ddlType = getColumnTypeForDDL(colType, col);
515
+                    String alterSql = String.format(
516
+                            "ALTER STABLE %s.%s ADD COLUMN %s %s",
517
+                            wrapName(dbName),
518
+                            wrapName(supertablename),
519
+                            wrapName(col),
520
+                            ddlType
521
+                    );
522
+                    try {
523
+                        stmt.executeUpdate(alterSql);
524
+                        log.info("超级表添加列: {}.{} -> {} ({})", dbName, supertablename, col, ddlType);
525
+                        existingColumns.add(col);
526
+                    } catch (SQLException e) {
527
+                        if (e.getMessage().contains("already exists") || e.getMessage().contains("duplicate")) {
528
+                            existingColumns.add(col);
529
+                        } else if (e.getMessage().contains("too many columns")) {
530
+                            log.error("超级表列数超限,无法添加列 | 列: {} | 表: {}.{} | 错误: {}",
531
+                                    col, dbName, supertablename, e.getMessage());
532
+                        } else {
533
+                            throw e;
534
+                        }
535
+                    }
536
+
537
+                    // 更新缓存
538
+                    String key = getStableKey(dbName, supertablename);
539
+                    stableColumnCache.put(key, existingColumns);
540
+                }
541
+            }
542
+        } finally {
543
+            if (stmt != null) try { stmt.close(); } catch (SQLException ignored) {}
544
+            closeConnection(conn);
545
+        }
546
+    }
547
+
548
+    /**
549
+     * 确保表存在
338 550
      */
339 551
     private void ensureTableExists(String dbName, String supertablename, String table) throws SQLException {
340 552
         Connection conn = null;
@@ -345,7 +557,7 @@ public class TDengineService {
345 557
             stmt = conn.createStatement();
346 558
             stmt.setQueryTimeout(5);
347 559
 
348
-            // 检查超级表是否存在(使用 information_schema)
560
+            // 检查超级表是否存在
349 561
             String checkStableSql = String.format(
350 562
                     "SELECT * FROM information_schema.ins_tables WHERE table_name = '%s' AND db_name = '%s'",
351 563
                     escapeValue(supertablename), escapeValue(dbName));
@@ -390,47 +602,6 @@ public class TDengineService {
390 602
         }
391 603
     }
392 604
 
393
-    /**
394
-     * 将动态字段构建为 JSON 字符串(排除 topic、ts、surfacename)
395
-     */
396
-    private String buildDynamicJson(Map<String, Object> data) {
397
-        if (data == null) return "{}";
398
-        Map<String, Object> dynamic = new LinkedHashMap<>();
399
-        for (Map.Entry<String, Object> entry : data.entrySet()) {
400
-            String key = entry.getKey();
401
-            if ("topic".equalsIgnoreCase(key) || "ts".equalsIgnoreCase(key)
402
-                    || "surfacename".equalsIgnoreCase(key)) {
403
-                continue;
404
-            }
405
-            Object value = entry.getValue();
406
-            if (value != null && !value.toString().trim().isEmpty()) {
407
-                dynamic.put(key, value);
408
-            }
409
-        }
410
-        try {
411
-            return objectMapper.writeValueAsString(dynamic);
412
-        } catch (Exception e) {
413
-            throw new IllegalStateException("JSON 序列化失败,数据可能丢失: " + dynamic, e);
414
-        }
415
-    }
416
-
417
-    /**
418
-     * GZIP 压缩字符串
419
-     */
420
-    private String compressToBase64(String data) {
421
-        if (data == null || data.isEmpty()) return "";
422
-        try {
423
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
424
-            try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
425
-                gzip.write(data.getBytes("UTF-8"));
426
-            }
427
-            byte[] compressed = Base64.getEncoder().encode(bos.toByteArray());
428
-            return new String(compressed);
429
-        } catch (Exception e) {
430
-            throw new IllegalStateException("GZIP 压缩失败: " + e.getMessage(), e);
431
-        }
432
-    }
433
-
434 605
     @Deprecated
435 606
     public void addToBatch(String dbName, String table, Map<String, Object> dataMap) throws SQLException {
436 607
         List<Map<String, Object>> list = new ArrayList<>();
@@ -438,9 +609,9 @@ public class TDengineService {
438 609
         insertBatch(dbName, table, list);
439 610
     }
440 611
 
441
-    // ==========================================
442
-    // 可选:清除缓存(用于运维或测试)
443
-    // ==========================================
612
+    /**
613
+     * 清除缓存
614
+     */
444 615
     public void clearStableColumnCache() {
445 616
         stableColumnCache.clear();
446 617
         log.info("清除了 TDengine 超级表结构缓存");
@@ -448,7 +619,6 @@ public class TDengineService {
448 619
 
449 620
     public void close() {
450 621
         log.info("关闭 TDengine 服务...");
451
-        // batchExecutor 由 Spring 管理生命周期,不在这里 shutdown
452 622
         if (dataSource != null) {
453 623
             dataSource.close();
454 624
         }

+ 0
- 327
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Ver arquivo

@@ -1,327 +0,0 @@
1
-package com.iot.platform.task;
2
-
3
-import com.iot.platform.domain.SysCar;
4
-import com.iot.platform.domain.SysDevice;
5
-import com.iot.platform.domain.SysDeviceControl;
6
-import com.iot.platform.config.IotProperties;
7
-import com.iot.platform.service.*;
8
-import org.slf4j.Logger;
9
-import org.slf4j.LoggerFactory;
10
-import org.springframework.dao.DataAccessException;
11
-import org.springframework.data.redis.RedisConnectionFailureException;
12
-import org.springframework.data.redis.core.RedisCallback;
13
-import org.springframework.data.redis.core.ScanOptions;
14
-import org.springframework.data.redis.core.StringRedisTemplate;
15
-import org.springframework.scheduling.annotation.Scheduled;
16
-import org.springframework.stereotype.Component;
17
-import org.springframework.web.client.RestClientException;
18
-import org.springframework.web.client.RestTemplate;
19
-
20
-import java.util.*;
21
-import java.util.concurrent.TimeUnit;
22
-
23
-@Component
24
-public class VehicleSyncTask {
25
-
26
-//    private static final Logger log = LoggerFactory.getLogger(VehicleSyncTask.class);
27
-//
28
-//    private final SysCarService sysCarService;
29
-//    private final SysDeviceService sysDeviceService;
30
-//    private final StringRedisTemplate stringRedisTemplate;
31
-//    private final SysrealtimeService sysrealtimeService;
32
-//    private final SysDeviceVoService sysDeviceVoService;
33
-//    private final SysDeviceControlService sysDeviceControlService;
34
-//    private final RestTemplate restTemplate;
35
-//    private final IotProperties iotProperties;
36
-//
37
-//    public VehicleSyncTask(SysCarService sysCarService,
38
-//                           SysDeviceService sysDeviceService,
39
-//                           StringRedisTemplate stringRedisTemplate,
40
-//                           SysrealtimeService sysrealtimeService,
41
-//                           SysDeviceVoService sysDeviceVoService,
42
-//                           SysDeviceControlService sysDeviceControlService,
43
-//                           RestTemplate restTemplate,
44
-//                           IotProperties iotProperties) {
45
-//        this.sysCarService = sysCarService;
46
-//        this.sysDeviceService = sysDeviceService;
47
-//        this.stringRedisTemplate = stringRedisTemplate;
48
-//        this.sysrealtimeService = sysrealtimeService;
49
-//        this.sysDeviceVoService = sysDeviceVoService;
50
-//        this.sysDeviceControlService = sysDeviceControlService;
51
-//        this.restTemplate = restTemplate;
52
-//        this.iotProperties = iotProperties;
53
-//    }
54
-//
55
-//    private boolean tryLock(String lockKey, long expireSeconds) {
56
-//        Boolean acquired = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", expireSeconds, TimeUnit.SECONDS);
57
-//        return Boolean.TRUE.equals(acquired);
58
-//    }
59
-//
60
-//    private void unlock(String lockKey) {
61
-//        Boolean deleted = stringRedisTemplate.delete(lockKey);
62
-//        if (!Boolean.TRUE.equals(deleted)) {
63
-//            log.warn("分布式锁释放失败: {}", lockKey);
64
-//        }
65
-//    }
66
-//
67
-//    /**
68
-//     * 更新车辆的控制器信息
69
-//     * 30秒更新一次
70
-//     */
71
-//    @Scheduled(fixedDelay = 30000)
72
-//    public void updateSysCar() {
73
-//        String lockKey = "lock:vehicle-sync:updateSysCar";
74
-//        if (!tryLock(lockKey, 60)) {
75
-//            log.debug("获取锁失败,跳过本次执行: {}", lockKey);
76
-//            return;
77
-//        }
78
-//        try {
79
-//            doUpdateSysCar();
80
-//        } finally {
81
-//            unlock(lockKey);
82
-//        }
83
-//    }
84
-//
85
-//    private void doUpdateSysCar() {
86
-//        List<SysCar> sysCarList = sysCarService.selectcontrollerId();
87
-//        for (SysCar sysCar : sysCarList) {
88
-//            try {
89
-//                if (sysCar.getControllerId() == null || sysCar.getControllerId().isEmpty()) {
90
-//                    continue;
91
-//                }
92
-//                SysDevice latitude = sysDeviceService.selectsysdevice(sysCar.getControllerId(), "纬度");
93
-//                SysDevice longitude = sysDeviceService.selectsysdevice(sysCar.getControllerId(), "经度");
94
-//
95
-//                String redisKeyPattern = "workorder:coordinate:" + sysCar.getControllerId() + ":*";
96
-//                Set<String> keys = scanKeys(redisKeyPattern);
97
-//
98
-//                if (keys == null || keys.isEmpty()) {
99
-//                    updateCarPosition(sysCar, latitude, longitude);
100
-//                } else {
101
-//                    for (String key : keys) {
102
-//                        Map<Object, Object> coordinateMap = stringRedisTemplate.opsForHash().entries(key);
103
-//                        Object cachedLat = coordinateMap.get("latitude");
104
-//                        Object cachedLon = coordinateMap.get("longitude");
105
-//                        if (cachedLat != null && cachedLat.equals(latitude.getV()) && cachedLon != null && cachedLon.equals(longitude.getV())) {
106
-//                            continue;
107
-//                        }
108
-//                        updateCarPosition(sysCar, latitude, longitude);
109
-//                    }
110
-//                }
111
-//            } catch (DataAccessException e) {
112
-//                log.error("更新车辆位置失败 carId={}: {}", sysCar.getCarId(), e.getMessage(), e);
113
-//            } catch (Exception e) {
114
-//                log.error("更新车辆位置异常 carId={}: {}", sysCar.getCarId(), e.getMessage(), e);
115
-//            }
116
-//        }
117
-//    }
118
-//
119
-//    private void updateCarPosition(SysCar sysCar, SysDevice latitude, SysDevice longitude) {
120
-//        String redisKey = "workorder:coordinate:" + sysCar.getControllerId();
121
-//        stringRedisTemplate.opsForHash().put(redisKey, "latitude", latitude.getV());
122
-//        stringRedisTemplate.opsForHash().put(redisKey, "longitude", longitude.getV());
123
-//        stringRedisTemplate.persist(redisKey);
124
-//
125
-//        String position = latitude.getV() + "," + longitude.getV();
126
-//        sysCarService.updatecarposition(position, sysCar.getCarId());
127
-//        String url = iotProperties.getMqtt().getVehicleTriggerUrl() + "?carId=" + sysCar.getCarId();
128
-//        try {
129
-//            restTemplate.postForObject(url, null, String.class);
130
-//        } catch (RestClientException e) {
131
-//            log.warn("触发webhook失败 carId={}: {}", sysCar.getCarId(), e.getMessage());
132
-//        }
133
-//    }
134
-//
135
-//    private Set<String> scanKeys(String pattern) {
136
-//        Set<String> keys = new HashSet<>();
137
-//        ScanOptions options = ScanOptions.scanOptions().match(pattern).count(100).build();
138
-//        try {
139
-//            stringRedisTemplate.execute((RedisCallback<Void>) connection -> {
140
-//                org.springframework.data.redis.core.Cursor<byte[]> cursor = connection.scan(options);
141
-//                while (cursor.hasNext()) {
142
-//                    keys.add(new String(cursor.next()));
143
-//                }
144
-//                cursor.close();
145
-//                return null;
146
-//            });
147
-//        } catch (RedisConnectionFailureException e) {
148
-//            log.warn("Redis SCAN 连接失败 pattern={}: {}", pattern, e.getMessage());
149
-//        } catch (Exception e) {
150
-//            log.error("Redis SCAN 失败 pattern={}: {}", pattern, e.getMessage(), e);
151
-//        }
152
-//        return keys;
153
-//    }
154
-//
155
-////    @Scheduled(fixedDelay = 30000)
156
-////    public void insertDevice() {
157
-////        String lockKey = "lock:vehicle-sync:insertDevice";
158
-////        if (!tryLock(lockKey, 60)) {
159
-////            log.debug("获取锁失败,跳过本次执行: {}", lockKey);
160
-////            return;
161
-////        }
162
-////        try {
163
-////            doInsertDevice();
164
-////        } finally {
165
-////            unlock(lockKey);
166
-////        }
167
-////    }
168
-////
169
-////    private void doInsertDevice() {
170
-////        Set<String> activeKeys;
171
-////        try {
172
-////            activeKeys = stringRedisTemplate.opsForSet().members("DSB:active:devices");
173
-////        } catch (Exception e) {
174
-////            log.warn("redis中无数据");
175
-////            return;
176
-////        }
177
-////        try {
178
-////            if (activeKeys == null || activeKeys.isEmpty()) {
179
-////                log.info("redis中无数据");
180
-////                return;
181
-////            }
182
-////            List<SysDeviceControl> sysDeviceControlList = sysDeviceControlService.selectdevice("device");
183
-////            for (String redisKey : activeKeys) {
184
-////                Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
185
-////                if (dataMap == null || dataMap.isEmpty()) {
186
-////                    stringRedisTemplate.opsForSet().remove("DSB:active:devices", redisKey);
187
-////                    continue;
188
-////                }
189
-////                String[] parts = redisKey.split(":", 3);
190
-////                if (parts.length != 3 || !"DSB".equals(parts[0])) {
191
-////                    log.warn("跳过非法 key: {}", redisKey);
192
-////                    continue;
193
-////                }
194
-////                String controllerId = parts[1];
195
-////                Integer count = sysDeviceVoService.selectcount(controllerId);
196
-////                if (count != null && count > 0) {
197
-////                    StringBuilder keyvalue = new StringBuilder();
198
-////                    for (int i = 0; i < sysDeviceControlList.size(); i++) {
199
-////                        for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
200
-////                            String fieldKey = entry.getKey().toString();
201
-////                            String fieldvalue = entry.getValue().toString();
202
-////                            if (sysDeviceControlList.get(i).getControllerName().equals(fieldKey)) {
203
-////                                keyvalue.append(fieldKey).append("=/'").append(fieldvalue).append("/'");
204
-////                                if (i < sysDeviceControlList.size() - 1) {
205
-////                                    keyvalue.append(",");
206
-////                                }
207
-////                            }
208
-////                        }
209
-////                    }
210
-////                    boolean updated = sysDeviceVoService.updatesysdevice(keyvalue.toString(), controllerId);
211
-////                    if (!updated) {
212
-////                        log.warn("更新设备配置失败: controllerId={}", controllerId);
213
-////                    }
214
-////                } else {
215
-////                    StringBuilder key = new StringBuilder();
216
-////                    StringBuilder value = new StringBuilder();
217
-////                    for (int i = 0; i < sysDeviceControlList.size(); i++) {
218
-////                        for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
219
-////                            String fieldKey = entry.getKey().toString();
220
-////                            String fieldvalue = entry.getValue().toString();
221
-////                            if (sysDeviceControlList.get(i).getControllerName().equals(fieldKey)) {
222
-////                                key.append(fieldKey);
223
-////                                value.append(fieldvalue);
224
-////                                if (i < sysDeviceControlList.size() - 1) {
225
-////                                    key.append(",");
226
-////                                    value.append(",");
227
-////                                }
228
-////                            }
229
-////                        }
230
-////                    }
231
-////                    boolean inserted = sysDeviceVoService.insertdevice(key.toString(), value.toString());
232
-////                    if (!inserted) {
233
-////                        log.warn("插入设备配置失败: controllerId={}", controllerId);
234
-////                    }
235
-////                }
236
-////            }
237
-////        } catch (RedisConnectionFailureException e) {
238
-////            log.warn("Redis 连接失败,跳过本次同步: {}", e.getMessage());
239
-////        } catch (DataAccessException e) {
240
-////            log.error("数据库操作失败: {}", e.getMessage(), e);
241
-////        } catch (Exception e) {
242
-////            log.error("同步设备配置失败: {}", e.getMessage(), e);
243
-////        }
244
-////    }
245
-//
246
-//    /**
247
-//     * 更新数据库实时数据
248
-//     */
249
-//    @Scheduled(fixedDelay = 30000)
250
-//    public void syncRedisToMySQL() {
251
-//        String lockKey = "lock:vehicle-sync:syncRedisToMySQL";
252
-//        if (!tryLock(lockKey, 60)) {
253
-//            log.debug("获取锁失败,跳过本次执行: {}", lockKey);
254
-//            return;
255
-//        }
256
-//        try {
257
-//            doSyncRedisToMySQL();
258
-//        } finally {
259
-//            unlock(lockKey);
260
-//        }
261
-//    }
262
-//
263
-//    private void doSyncRedisToMySQL() {
264
-//        Set<String> activeKeys = stringRedisTemplate.opsForSet().members("DSB:active:devices");
265
-//        if (activeKeys == null || activeKeys.isEmpty()) return;
266
-//
267
-//        for (String redisKey : activeKeys) {
268
-//            try {
269
-//                Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
270
-//                if (dataMap == null || dataMap.isEmpty()) {
271
-//                    stringRedisTemplate.opsForSet().remove("DSB:active:devices", redisKey);
272
-//                    continue;
273
-//                }
274
-//
275
-//                String[] parts = redisKey.split(":", 3);
276
-//                if (parts.length != 3 || !"DSB".equals(parts[0])) {
277
-//                    log.warn("跳过非法 key: {}", redisKey);
278
-//                    continue;
279
-//                }
280
-//                String controllerId = parts[1];
281
-//
282
-//                try {
283
-//                    sysrealtimeService.createrealtime(controllerId);
284
-//                } catch (Exception e) {
285
-//                    log.error("创建表失败: {} | {}", controllerId, e.getMessage(), e);
286
-//                    continue;
287
-//                }
288
-//
289
-//                String createTime = getStringValue(dataMap, "createTime");
290
-//                String timestamp = getStringValue(dataMap, "timestamp");
291
-//                String deviceId = getStringValue(dataMap, "device_id");
292
-//                if (createTime == null || timestamp == null || deviceId == null) {
293
-//                    continue;
294
-//                }
295
-//
296
-//                List<String> existingKeys = sysrealtimeService.selectAllKeys(controllerId);
297
-//                Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
298
-//
299
-//                for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
300
-//                    String fieldKey = entry.getKey().toString();
301
-//                    if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey)) {
302
-//                        continue;
303
-//                    }
304
-//                    String fieldValue = getStringValue(dataMap, fieldKey);
305
-//                    if (fieldValue == null) continue;
306
-//
307
-//                    if (existingKeySet.contains(fieldKey)) {
308
-//                        sysrealtimeService.updatetables(controllerId, createTime, fieldValue, timestamp, fieldKey, deviceId);
309
-//                    } else {
310
-//                        sysrealtimeService.inserttables(controllerId, createTime, deviceId, timestamp, fieldKey, fieldValue);
311
-//                    }
312
-//                }
313
-//            } catch (RedisConnectionFailureException e) {
314
-//                log.error("Redis 连接失败: {} | {}", redisKey, e.getMessage());
315
-//            } catch (DataAccessException e) {
316
-//                log.error("数据库操作失败: {} | {}", redisKey, e.getMessage(), e);
317
-//            } catch (Exception e) {
318
-//                log.error("同步设备失败: {} | {}", redisKey, e.getMessage(), e);
319
-//            }
320
-//        }
321
-//    }
322
-//
323
-//    private String getStringValue(Map<Object, Object> map, String key) {
324
-//        Object val = map.get(key);
325
-//        return val == null ? null : val.toString().trim();
326
-//    }
327
-}

+ 195
- 45
iot-platform/src/test/java/com/iot/platform/service/TDengineServiceTest.java Ver arquivo

@@ -83,69 +83,225 @@ class TDengineServiceTest {
83 83
     }
84 84
 
85 85
     @Test
86
-    @DisplayName("buildDynamicJson: 应排除 topic/ts/surfacename")
87
-    void buildDynamicJson_excludesReservedKeys() throws Exception {
88
-        Method method = TDengineService.class.getDeclaredMethod("buildDynamicJson", Map.class);
86
+    @DisplayName("getValueType: Boolean 返回 BOOL")
87
+    void getValueType_boolean_returnsBool() throws Exception {
88
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
89 89
         method.setAccessible(true);
90 90
 
91
-        Map<String, Object> data = new LinkedHashMap<>();
92
-        data.put("topic", "test/topic");
93
-        data.put("ts", "1234567890");
94
-        data.put("surfacename", "s1");
95
-        data.put("temperature", "25.5");
96
-        data.put("humidity", "60");
91
+        assertThat(method.invoke(tdengineService, Boolean.TRUE)).isEqualTo("BOOL");
92
+        assertThat(method.invoke(tdengineService, Boolean.FALSE)).isEqualTo("BOOL");
93
+    }
94
+
95
+    @Test
96
+    @DisplayName("getValueType: Integer 返回 BIGINT")
97
+    void getValueType_integer_returnsBigint() throws Exception {
98
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
99
+        method.setAccessible(true);
97 100
 
98
-        String result = (String) method.invoke(tdengineService, data);
99
-        assertThat(result).contains("temperature", "humidity");
100
-        assertThat(result).doesNotContain("topic", "ts", "surfacename");
101
+        assertThat(method.invoke(tdengineService, 123)).isEqualTo("BIGINT");
102
+        assertThat(method.invoke(tdengineService, Integer.valueOf(456))).isEqualTo("BIGINT");
101 103
     }
102 104
 
103 105
     @Test
104
-    @DisplayName("buildDynamicJson: 空值和空字符串应被过滤")
105
-    void buildDynamicJson_filtersEmptyValues() throws Exception {
106
-        Method method = TDengineService.class.getDeclaredMethod("buildDynamicJson", Map.class);
106
+    @DisplayName("getValueType: Long 返回 BIGINT")
107
+    void getValueType_long_returnsBigint() throws Exception {
108
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
107 109
         method.setAccessible(true);
108 110
 
109
-        Map<String, Object> data = new LinkedHashMap<>();
110
-        data.put("valid", "value");
111
-        data.put("empty", "");
112
-        data.put("nullVal", null);
111
+        assertThat(method.invoke(tdengineService, 123L)).isEqualTo("BIGINT");
112
+    }
113
+
114
+    @Test
115
+    @DisplayName("getValueType: Float 返回 DOUBLE")
116
+    void getValueType_float_returnsDouble() throws Exception {
117
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
118
+        method.setAccessible(true);
119
+
120
+        assertThat(method.invoke(tdengineService, 12.34f)).isEqualTo("DOUBLE");
121
+    }
122
+
123
+    @Test
124
+    @DisplayName("getValueType: Double 返回 DOUBLE")
125
+    void getValueType_double_returnsDouble() throws Exception {
126
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
127
+        method.setAccessible(true);
113 128
 
114
-        String result = (String) method.invoke(tdengineService, data);
115
-        assertThat(result).contains("valid");
116
-        assertThat(result).doesNotContain("empty", "nullVal");
129
+        assertThat(method.invoke(tdengineService, 56.78)).isEqualTo("DOUBLE");
130
+        assertThat(method.invoke(tdengineService, Double.valueOf(99.99))).isEqualTo("DOUBLE");
117 131
     }
118 132
 
119 133
     @Test
120
-    @DisplayName("buildDynamicJson: null 输入应返回空 JSON")
121
-    void buildDynamicJson_nullInput_returnsEmptyJson() throws Exception {
122
-        Method method = TDengineService.class.getDeclaredMethod("buildDynamicJson", Map.class);
134
+    @DisplayName("getValueType: java.util.Date 返回 TIMESTAMP")
135
+    void getValueType_date_returnsTimestamp() throws Exception {
136
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
123 137
         method.setAccessible(true);
124 138
 
125
-        assertThat(method.invoke(tdengineService, (Map<String, Object>) null)).isEqualTo("{}");
139
+        assertThat(method.invoke(tdengineService, new java.util.Date())).isEqualTo("TIMESTAMP");
126 140
     }
127 141
 
128 142
     @Test
129
-    @DisplayName("compressToBase64: 应压缩并返回非空字符串")
130
-    void compressToBase64_compressesData() throws Exception {
131
-        Method method = TDengineService.class.getDeclaredMethod("compressToBase64", String.class);
143
+    @DisplayName("getValueType: java.sql.Timestamp 返回 TIMESTAMP")
144
+    void getValueType_timestamp_returnsTimestamp() throws Exception {
145
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
132 146
         method.setAccessible(true);
133 147
 
134
-        String input = "{\"temperature\":25.5,\"humidity\":60}";
135
-        String result = (String) method.invoke(tdengineService, input);
148
+        assertThat(method.invoke(tdengineService, new java.sql.Timestamp(System.currentTimeMillis()))).isEqualTo("TIMESTAMP");
149
+    }
150
+
151
+    @Test
152
+    @DisplayName("getValueType: java.time.LocalDateTime 返回 TIMESTAMP")
153
+    void getValueType_localDateTime_returnsTimestamp() throws Exception {
154
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
155
+        method.setAccessible(true);
136 156
 
137
-        assertThat(result).isNotEmpty();
138
-        assertThat(result).isNotEqualTo(input); // should be compressed
157
+        assertThat(method.invoke(tdengineService, java.time.LocalDateTime.now())).isEqualTo("TIMESTAMP");
139 158
     }
140 159
 
141 160
     @Test
142
-    @DisplayName("compressToBase64: null 或空字符串应返回空")
143
-    void compressToBase64_nullOrEmpty_returnsEmpty() throws Exception {
144
-        Method method = TDengineService.class.getDeclaredMethod("compressToBase64", String.class);
161
+    @DisplayName("getValueType: java.time.Instant 返回 TIMESTAMP")
162
+    void getValueType_instant_returnsTimestamp() throws Exception {
163
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
145 164
         method.setAccessible(true);
146 165
 
147
-        assertThat(method.invoke(tdengineService, (String) null)).isEqualTo("");
148
-        assertThat(method.invoke(tdengineService, "")).isEqualTo("");
166
+        assertThat(method.invoke(tdengineService, java.time.Instant.now())).isEqualTo("TIMESTAMP");
167
+    }
168
+
169
+    @Test
170
+    @DisplayName("getValueType: String 返回 VARCHAR")
171
+    void getValueType_string_returnsVarchar() throws Exception {
172
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
173
+        method.setAccessible(true);
174
+
175
+        assertThat(method.invoke(tdengineService, "test")).isEqualTo("VARCHAR");
176
+        assertThat(method.invoke(tdengineService, "")).isEqualTo("VARCHAR");
177
+    }
178
+
179
+    @Test
180
+    @DisplayName("getValueType: null 返回 VARCHAR")
181
+    void getValueType_null_returnsVarchar() throws Exception {
182
+        Method method = TDengineService.class.getDeclaredMethod("getValueType", Object.class);
183
+        method.setAccessible(true);
184
+
185
+        assertThat(method.invoke(tdengineService, null)).isEqualTo("VARCHAR");
186
+    }
187
+
188
+    @Test
189
+    @DisplayName("getColumnTypeForDDL: BOOL/BIGINT/DOUBLE/TIMESTAMP 直接返回")
190
+    void getColumnTypeForDDL_directTypes() throws Exception {
191
+        Method method = TDengineService.class.getDeclaredMethod("getColumnTypeForDDL", String.class);
192
+        method.setAccessible(true);
193
+
194
+        assertThat(method.invoke(tdengineService, "BOOL")).isEqualTo("BOOL");
195
+        assertThat(method.invoke(tdengineService, "BIGINT")).isEqualTo("BIGINT");
196
+        assertThat(method.invoke(tdengineService, "DOUBLE")).isEqualTo("DOUBLE");
197
+        assertThat(method.invoke(tdengineService, "TIMESTAMP")).isEqualTo("TIMESTAMP");
198
+    }
199
+
200
+    @Test
201
+    @DisplayName("getColumnTypeForDDL: VARCHAR 返回 VARCHAR(32)")
202
+    void getColumnTypeForDDL_varchar_returnsWithLength() throws Exception {
203
+        Method method = TDengineService.class.getDeclaredMethod("getColumnTypeForDDL", String.class);
204
+        method.setAccessible(true);
205
+
206
+        assertThat(method.invoke(tdengineService, "VARCHAR")).isEqualTo("VARCHAR(32)");
207
+    }
208
+
209
+    @Test
210
+    @DisplayName("formatValue: null 返回 NULL")
211
+    void formatValue_null_returnsNull() throws Exception {
212
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
213
+        method.setAccessible(true);
214
+
215
+        assertThat(method.invoke(tdengineService, null, "col", "VARCHAR")).isEqualTo("NULL");
216
+    }
217
+
218
+    @Test
219
+    @DisplayName("formatValue: DOUBLE 类型直接返回数字字符串")
220
+    void formatValue_double_returnsNumberString() throws Exception {
221
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
222
+        method.setAccessible(true);
223
+
224
+        assertThat(method.invoke(tdengineService, 12.34, "col", "DOUBLE")).isEqualTo("12.34");
225
+    }
226
+
227
+    @Test
228
+    @DisplayName("formatValue: BIGINT 类型直接返回数字字符串")
229
+    void formatValue_bigint_returnsNumberString() throws Exception {
230
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
231
+        method.setAccessible(true);
232
+
233
+        assertThat(method.invoke(tdengineService, 123L, "col", "BIGINT")).isEqualTo("123");
234
+    }
235
+
236
+    @Test
237
+    @DisplayName("formatValue: BOOL 类型 true 返回 1")
238
+    void formatValue_bool_true_returns1() throws Exception {
239
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
240
+        method.setAccessible(true);
241
+
242
+        assertThat(method.invoke(tdengineService, true, "col", "BOOL")).isEqualTo("1");
243
+        assertThat(method.invoke(tdengineService, false, "col", "BOOL")).isEqualTo("0");
244
+    }
245
+
246
+    @Test
247
+    @DisplayName("formatValue: VARCHAR 类型超过32字符返回 NULL")
248
+    void formatValue_varchar_exceedsLength_returnsNull() throws Exception {
249
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
250
+        method.setAccessible(true);
251
+
252
+        String longStr = "12345678901234567890123456789012345"; // 35 chars
253
+        assertThat(method.invoke(tdengineService, longStr, "col", "VARCHAR")).isEqualTo("NULL");
254
+    }
255
+
256
+    @Test
257
+    @DisplayName("formatValue: VARCHAR 类型32字符内正常返回")
258
+    void formatValue_varchar_withinLength_returnsQuoted() throws Exception {
259
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
260
+        method.setAccessible(true);
261
+
262
+        assertThat(method.invoke(tdengineService, "test", "col", "VARCHAR")).isEqualTo("'test'");
263
+    }
264
+
265
+    @Test
266
+    @DisplayName("formatValue: VARCHAR 类型包含单引号应转义")
267
+    void formatValue_varchar_withQuote_escapes() throws Exception {
268
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
269
+        method.setAccessible(true);
270
+
271
+        assertThat(method.invoke(tdengineService, "it's", "col", "VARCHAR")).isEqualTo("'it''s'");
272
+    }
273
+
274
+    @Test
275
+    @DisplayName("formatValue: TIMESTAMP 类型 java.util.Date 转为毫秒")
276
+    void formatValue_timestamp_date_returnsMillis() throws Exception {
277
+        Method method = TDengineService.class.getDeclaredMethod("formatValue", Object.class, String.class, String.class);
278
+        method.setAccessible(true);
279
+
280
+        java.util.Date date = new java.util.Date(1234567890000L);
281
+        String result = (String) method.invoke(tdengineService, date, "col", "TIMESTAMP");
282
+        assertThat(result).isEqualTo("1234567890000");
283
+    }
284
+
285
+    @Test
286
+    @DisplayName("isReservedColumn: ts 和 surfacename 返回 true")
287
+    void isReservedColumn_reservedNames() throws Exception {
288
+        Method method = TDengineService.class.getDeclaredMethod("isReservedColumn", String.class);
289
+        method.setAccessible(true);
290
+
291
+        assertThat(method.invoke(tdengineService, "ts")).isEqualTo(true);
292
+        assertThat(method.invoke(tdengineService, "TS")).isEqualTo(true);
293
+        assertThat(method.invoke(tdengineService, "surfacename")).isEqualTo(true);
294
+        assertThat(method.invoke(tdengineService, "Surfacename")).isEqualTo(true);
295
+    }
296
+
297
+    @Test
298
+    @DisplayName("isReservedColumn: 其他列名返回 false")
299
+    void isReservedColumn_otherNames() throws Exception {
300
+        Method method = TDengineService.class.getDeclaredMethod("isReservedColumn", String.class);
301
+        method.setAccessible(true);
302
+
303
+        assertThat(method.invoke(tdengineService, "temperature")).isEqualTo(false);
304
+        assertThat(method.invoke(tdengineService, "voltage")).isEqualTo(false);
149 305
     }
150 306
 
151 307
     @Test
@@ -161,8 +317,6 @@ class TDengineServiceTest {
161 317
         Map<String, Object> data = new HashMap<>();
162 318
         data.put("key", "value");
163 319
 
164
-        // addToBatch 现在声明 throws SQLException,尝试连接 TDengine 会失败
165
-        // 本地环境缺少 TDengine 驱动时会抛出各种异常(UnsatisfiedLinkError / NoClassDefFoundError / IllegalStateException)
166 320
         assertThatThrownBy(() -> tdengineService.addToBatch("db", "table", data))
167 321
                 .isInstanceOf(Throwable.class);
168 322
     }
@@ -171,21 +325,18 @@ class TDengineServiceTest {
171 325
     @DisplayName("clearStableColumnCache: 应清空缓存不抛异常")
172 326
     void clearStableColumnCache_clearsWithoutError() {
173 327
         tdengineService.clearStableColumnCache();
174
-        // Should not throw
175 328
     }
176 329
 
177 330
     @Test
178 331
     @DisplayName("close: 应关闭数据源不抛异常")
179 332
     void close_closesDataSource() {
180 333
         tdengineService.close();
181
-        // Should not throw even if dataSource is null
182 334
     }
183 335
 
184 336
     @Test
185 337
     @DisplayName("closeConnection: null 连接应安全处理")
186 338
     void closeConnection_nullConnection_safe() {
187 339
         tdengineService.closeConnection(null);
188
-        // Should not throw
189 340
     }
190 341
 
191 342
     @Test
@@ -193,8 +344,7 @@ class TDengineServiceTest {
193 344
     void getConnection_initFails_throwsException() {
194 345
         when(tdengineConfig.getUrl()).thenReturn("jdbc:TAOS://invalid:6030/test");
195 346
 
196
-        // 本地环境缺少 TDengine 驱动时会抛出 NoClassDefFoundError / UnsatisfiedLinkError
197 347
         assertThatThrownBy(() -> tdengineService.getConnection())
198 348
                 .isInstanceOf(Throwable.class);
199 349
     }
200
-}
350
+}

+ 0
- 209
iot-platform/src/test/java/com/iot/platform/task/VehicleSyncTaskTest.java Ver arquivo

@@ -1,209 +0,0 @@
1
-package com.iot.platform.task;
2
-
3
-import com.iot.platform.config.IotProperties;
4
-import com.iot.platform.domain.SysCar;
5
-import com.iot.platform.domain.SysDevice;
6
-import com.iot.platform.service.*;
7
-import org.junit.jupiter.api.BeforeEach;
8
-import org.junit.jupiter.api.DisplayName;
9
-import org.junit.jupiter.api.Test;
10
-import org.junit.jupiter.api.extension.ExtendWith;
11
-import org.mockito.InjectMocks;
12
-import org.mockito.Mock;
13
-import org.mockito.junit.jupiter.MockitoExtension;
14
-import org.mockito.junit.jupiter.MockitoSettings;
15
-import org.mockito.quality.Strictness;
16
-import org.springframework.dao.DataAccessException;
17
-import org.springframework.data.redis.RedisConnectionFailureException;
18
-import org.springframework.data.redis.core.StringRedisTemplate;
19
-import org.springframework.data.redis.core.ValueOperations;
20
-import org.springframework.web.client.RestClientException;
21
-import org.springframework.web.client.RestTemplate;
22
-
23
-import java.util.*;
24
-
25
-import static org.assertj.core.api.Assertions.assertThat;
26
-import static org.mockito.ArgumentMatchers.*;
27
-import static org.mockito.Mockito.*;
28
-
29
-@ExtendWith(MockitoExtension.class)
30
-@MockitoSettings(strictness = Strictness.LENIENT)
31
-class VehicleSyncTaskTest {
32
-
33
-    @Mock
34
-    private SysCarService sysCarService;
35
-    @Mock
36
-    private SysDeviceService sysDeviceService;
37
-    @Mock
38
-    private StringRedisTemplate stringRedisTemplate;
39
-    @Mock
40
-    private SysrealtimeService sysrealtimeService;
41
-    @Mock
42
-    private SysDeviceVoService sysDeviceVoService;
43
-    @Mock
44
-    private SysDeviceControlService sysDeviceControlService;
45
-    @Mock
46
-    private SysWorkorderService sysWorkorderService;
47
-    @Mock
48
-    private SysIndicatorsService sysIndicatorsService;
49
-
50
-    @Mock
51
-    private RestTemplate restTemplate;
52
-    @Mock
53
-    private IotProperties iotProperties;
54
-    @Mock
55
-    private IotProperties.Mqtt mqttConfig;
56
-
57
-    @InjectMocks
58
-    private VehicleSyncTask task;
59
-
60
-    @Mock
61
-    private ValueOperations<String, String> valueOps;
62
-
63
-    @BeforeEach
64
-    void setUp() {
65
-        when(stringRedisTemplate.opsForValue()).thenReturn(valueOps);
66
-        when(iotProperties.getMqtt()).thenReturn(mqttConfig);
67
-        when(mqttConfig.getVehicleTriggerUrl()).thenReturn("https://esos-iot.com:9443/syscar/trigger");
68
-    }
69
-
70
-    @Test
71
-    @DisplayName("updateSysCar: 获取锁失败时应跳过执行")
72
-    void updateSysCar_lockFail_skipsExecution() {
73
-        when(valueOps.setIfAbsent(anyString(), eq("1"), anyLong(), any())).thenReturn(false);
74
-
75
-        task.updateSysCar();
76
-
77
-        verify(sysCarService, never()).selectcontrollerId();
78
-    }
79
-
80
-    @Test
81
-    @DisplayName("updateSysCar: 获取锁成功时应执行车辆位置更新")
82
-    void updateSysCar_lockSuccess_executesUpdate() {
83
-        when(valueOps.setIfAbsent(anyString(), eq("1"), anyLong(), any())).thenReturn(true);
84
-
85
-        SysCar car = new SysCar();
86
-        car.setCarId("1");
87
-        car.setControllerId("CTRL001");
88
-        when(sysCarService.selectcontrollerId()).thenReturn(Collections.singletonList(car));
89
-
90
-        SysDevice lat = new SysDevice();
91
-        lat.setV("31.2304");
92
-        SysDevice lon = new SysDevice();
93
-        lon.setV("121.4737");
94
-        when(sysDeviceService.selectsysdevice("CTRL001", "纬度")).thenReturn(lat);
95
-        when(sysDeviceService.selectsysdevice("CTRL001", "经度")).thenReturn(lon);
96
-
97
-        when(stringRedisTemplate.opsForHash()).thenReturn(mock(org.springframework.data.redis.core.HashOperations.class));
98
-        when(stringRedisTemplate.delete(anyString())).thenReturn(true);
99
-
100
-        task.updateSysCar();
101
-
102
-        verify(sysCarService).selectcontrollerId();
103
-    }
104
-
105
-    @Test
106
-    @DisplayName("updateSysCar: 单条记录异常时不应中断整个批次")
107
-    void updateSysCar_singleRecordException_continuesBatch() {
108
-        when(valueOps.setIfAbsent(anyString(), eq("1"), anyLong(), any())).thenReturn(true);
109
-
110
-        SysCar car1 = new SysCar();
111
-        car1.setCarId("1");
112
-        car1.setControllerId("CTRL001");
113
-        SysCar car2 = new SysCar();
114
-        car2.setCarId("2");
115
-        car2.setControllerId("CTRL002");
116
-        when(sysCarService.selectcontrollerId()).thenReturn(Arrays.asList(car1, car2));
117
-
118
-        // car1 正常
119
-        SysDevice lat1 = new SysDevice();
120
-        lat1.setV("31.2304");
121
-        SysDevice lon1 = new SysDevice();
122
-        lon1.setV("121.4737");
123
-        when(sysDeviceService.selectsysdevice("CTRL001", "纬度")).thenReturn(lat1);
124
-        when(sysDeviceService.selectsysdevice("CTRL001", "经度")).thenReturn(lon1);
125
-
126
-        // car2 抛异常
127
-        when(sysDeviceService.selectsysdevice("CTRL002", "纬度"))
128
-                .thenThrow(new DataAccessException("DB error") {});
129
-
130
-        when(stringRedisTemplate.opsForHash()).thenReturn(mock(org.springframework.data.redis.core.HashOperations.class));
131
-        when(stringRedisTemplate.delete(anyString())).thenReturn(true);
132
-
133
-        task.updateSysCar();
134
-
135
-        // car1 正常执行,car2 异常被捕获,两者都应该尝试
136
-        verify(sysDeviceService).selectsysdevice("CTRL001", "纬度");
137
-        verify(sysDeviceService).selectsysdevice("CTRL002", "纬度");
138
-    }
139
-
140
-    @Test
141
-    @DisplayName("insertDevice: Redis 连接失败时应跳过执行")
142
-    void insertDevice_redisConnectionFailure_skipsGracefully() {
143
-        when(valueOps.setIfAbsent(anyString(), eq("1"), anyLong(), any())).thenReturn(true);
144
-        when(stringRedisTemplate.opsForSet()).thenThrow(
145
-                new RedisConnectionFailureException("Redis down"));
146
-        when(stringRedisTemplate.delete(anyString())).thenReturn(true);
147
-
148
-        task.insertDevice();
149
-
150
-        verify(sysDeviceControlService, never()).selectdevice(anyString());
151
-    }
152
-
153
-    @Test
154
-    @DisplayName("insertDevice: 空数据时应直接返回")
155
-    void insertDevice_emptyData_returnsEarly() {
156
-        when(valueOps.setIfAbsent(anyString(), eq("1"), anyLong(), any())).thenReturn(true);
157
-
158
-        org.springframework.data.redis.core.SetOperations setOps = mock(org.springframework.data.redis.core.SetOperations.class);
159
-        when(stringRedisTemplate.opsForSet()).thenReturn(setOps);
160
-        when(setOps.members("DSB:active:devices")).thenReturn(Collections.emptySet());
161
-        when(stringRedisTemplate.delete(anyString())).thenReturn(true);
162
-
163
-        task.insertDevice();
164
-
165
-        verify(sysDeviceControlService, never()).selectdevice(anyString());
166
-    }
167
-
168
-    @Test
169
-    @DisplayName("syncRedisToMySQL: 空活跃 key 时应直接返回")
170
-    void syncRedisToMySQL_emptyKeys_returnsEarly() {
171
-        when(valueOps.setIfAbsent(anyString(), eq("1"), anyLong(), any())).thenReturn(true);
172
-
173
-        org.springframework.data.redis.core.SetOperations setOps = mock(org.springframework.data.redis.core.SetOperations.class);
174
-        when(stringRedisTemplate.opsForSet()).thenReturn(setOps);
175
-        when(setOps.members("DSB:active:devices")).thenReturn(null);
176
-        when(stringRedisTemplate.delete(anyString())).thenReturn(true);
177
-
178
-        task.syncRedisToMySQL();
179
-
180
-        verify(sysrealtimeService, never()).createrealtime(anyString());
181
-    }
182
-
183
-
184
-    @Test
185
-    @DisplayName("webhook 调用失败时不应中断主流程")
186
-    void updateCarPosition_webhookFailure_continues() {
187
-        when(valueOps.setIfAbsent(anyString(), eq("1"), anyLong(), any())).thenReturn(true);
188
-
189
-        SysCar car = new SysCar();
190
-        car.setCarId("1");
191
-        car.setControllerId("CTRL001");
192
-        when(sysCarService.selectcontrollerId()).thenReturn(Collections.singletonList(car));
193
-
194
-        SysDevice lat = new SysDevice();
195
-        lat.setV("31.2304");
196
-        SysDevice lon = new SysDevice();
197
-        lon.setV("121.4737");
198
-        when(sysDeviceService.selectsysdevice("CTRL001", "纬度")).thenReturn(lat);
199
-        when(sysDeviceService.selectsysdevice("CTRL001", "经度")).thenReturn(lon);
200
-
201
-        when(stringRedisTemplate.opsForHash()).thenReturn(mock(org.springframework.data.redis.core.HashOperations.class));
202
-        when(stringRedisTemplate.delete(anyString())).thenReturn(true);
203
-        when(restTemplate.postForObject(anyString(), isNull(), eq(String.class)))
204
-                .thenThrow(new RestClientException("Connection refused"));
205
-
206
-        // 不应抛异常
207
-        task.updateSysCar();
208
-    }
209
-}

+ 4
- 6
pom.xml Ver arquivo

@@ -4,16 +4,14 @@
4 4
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 5
 	<modelVersion>4.0.0</modelVersion>
6 6
 
7
-    <groupId>com.ruoyi</groupId>
8
-    <artifactId>ruoyi</artifactId>
9
-    <version>3.9.0</version>
7
+    <groupId>com.iot.platform</groupId>
8
+    <artifactId>wisdom-data</artifactId>
9
+    <version>1.0.0</version>
10 10
 
11
-    <name>iot-platform</name>
12
-    <url>http://www.ruoyi.vip</url>
11
+    <name>wisdom-data</name>
13 12
     <description>IoT储能运营平台</description>
14 13
 
15 14
     <properties>
16
-        <ruoyi.version>3.9.0</ruoyi.version>
17 15
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18 16
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
19 17
         <java.version>1.8</java.version>

Carregando…
Cancelar
Salvar