Просмотр исходного кода

refactor: merge previous session changes

Resolve merge state from prior work session.
mqy20260511
humanleft 4 дней назад
Родитель
Сommit
9913e8ca31

+ 40
- 2
iot-platform/src/main/java/com/iot/platform/service/SysrealtimeService.java Просмотреть файл

@@ -2,20 +2,58 @@ package com.iot.platform.service;
2 2
 
3 3
 import com.iot.platform.mapper.SysrealtimeMapper;
4 4
 import org.apache.ibatis.annotations.Param;
5
+import org.slf4j.Logger;
6
+import org.slf4j.LoggerFactory;
5 7
 import org.springframework.stereotype.Service;
6 8
 
7 9
 import javax.annotation.Resource;
10
+import java.sql.Connection;
11
+import java.sql.DriverManager;
12
+import java.sql.SQLException;
8 13
 import java.util.List;
9 14
 
10 15
 @Service
11 16
 public class SysrealtimeService {
17
+    private static final Logger log = LoggerFactory.getLogger(SysrealtimeService.class);
12 18
 
13 19
     @Resource
14 20
     public SysrealtimeMapper sysrealtimeMapper;
15 21
 
16
-    public void createrealtime(String tableName){
17
-        sysrealtimeMapper.createrealtime(tableName);
22
+    public void createrealtime(String tableName) {
23
+        log.info("准备创建表: [{}]", tableName);
24
+        log.info("执行建表SQL前,先验证数据库连接...");
25
+
26
+        // 直接用JDBC测试连接和建表
27
+        try (Connection conn = getConnection()) {
28
+            String sql = "CREATE TABLE IF NOT EXISTS `" + tableName + "` (" +
29
+                    "create_time VARCHAR(255) NOT NULL COMMENT '时间戳'," +
30
+                    "device_id VARCHAR(255) NOT NULL COMMENT '设备id'," +
31
+                    "timestamp VARCHAR(255) NOT NULL COMMENT '时间戳'," +
32
+                    "k VARCHAR(255) NOT NULL COMMENT 'key'," +
33
+                    "v VARCHAR(255) NOT NULL COMMENT '值'," +
34
+                    "INDEX idx_device_id (device_id)," +
35
+                    "INDEX idx_device_create_time (device_id, create_time)," +
36
+                    "INDEX idx_k (k)" +
37
+                    ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='实时数据信息'";
38
+            log.info("SQL: {}", sql);
39
+            conn.createStatement().executeUpdate(sql);
40
+            log.info("表[{}]创建成功", tableName);
41
+        } catch (Exception e) {
42
+            log.error("JDBC建表[{}]失败: {}", tableName, e.getMessage());
43
+            Throwable cause = e;
44
+            while (cause.getCause() != null) cause = cause.getCause();
45
+            log.error("根因: {}", cause.getMessage());
46
+            throw new RuntimeException("建表失败", e);
47
+        }
48
+    }
49
+
50
+    private Connection getConnection() throws SQLException {
51
+        String url = "jdbc:mysql://47.104.204.180:3306/data?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8";
52
+        String user = "root";
53
+        String password = "Zhu059300()__";
54
+        return DriverManager.getConnection(url, user, password);
18 55
     }
56
+
19 57
     public List<String> selecttables(){
20 58
         return sysrealtimeMapper.selecttables();
21 59
     }

+ 194
- 101
iot-platform/src/main/java/com/iot/platform/service/TDengineService.java Просмотреть файл

@@ -8,9 +8,13 @@ import org.slf4j.LoggerFactory;
8 8
 import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.stereotype.Service;
10 10
 
11
+import java.io.ByteArrayOutputStream;
11 12
 import java.sql.*;
12 13
 import java.util.*;
13 14
 import java.util.concurrent.*;
15
+import java.util.zip.*;
16
+
17
+import com.fasterxml.jackson.databind.ObjectMapper;
14 18
 
15 19
 @Service
16 20
 public class TDengineService {
@@ -37,6 +41,12 @@ public class TDengineService {
37 41
     // === 新增:缓存超级表结构 (key = dbName.stableName) ===
38 42
     private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
39 43
 
44
+    // JSON 列名,用于存储所有动态字段
45
+    private static final String JSON_COLUMN_NAME = "ext_data";
46
+
47
+    // ObjectMapper 线程安全,可复用
48
+    private static final ObjectMapper objectMapper = new ObjectMapper();
49
+
40 50
     public TDengineService() {
41 51
         // 延迟初始化:不在构造器中创建连接池,避免 TDengine 本地库缺失时阻断启动
42 52
     }
@@ -162,70 +172,6 @@ public class TDengineService {
162 172
     }
163 173
 
164 174
     // ==========================================
165
-    // 新增列(成功后更新缓存)
166
-    // ==========================================
167
-    private boolean addColumnToStable(String dbName, String stableName, String columnName) {
168
-        String sql = String.format("ALTER TABLE %s.%s ADD COLUMN %s VARCHAR(50)",
169
-                wrapName(dbName), wrapName(stableName), wrapName(columnName));
170
-
171
-        Connection conn = null;
172
-        Statement stmt = null;
173
-        try {
174
-            conn = getConnection();
175
-            stmt = conn.createStatement();
176
-            stmt.setQueryTimeout(10);
177
-            stmt.executeUpdate(sql);
178
-            log.info("✅ 新增列成功: {}.{}.{}", dbName, stableName, columnName);
179
-
180
-            // ✅ 更新缓存
181
-            String key = getStableKey(dbName, stableName);
182
-            stableColumnCache.computeIfPresent(key, (k, existingCols) -> {
183
-                Set<String> newCols = new HashSet<>(existingCols);
184
-                newCols.add(columnName);
185
-                return newCols;
186
-            });
187
-
188
-            return true;
189
-        } catch (SQLException e) {
190
-            if (e.getMessage().contains("already exists")) {
191
-                // 列已存在,确保缓存包含它(防御性)
192
-                String key = getStableKey(dbName, stableName);
193
-                stableColumnCache.computeIfPresent(key, (k, cols) -> {
194
-                    if (!cols.contains(columnName)) {
195
-                        Set<String> updated = new HashSet<>(cols);
196
-                        updated.add(columnName);
197
-                        return updated;
198
-                    }
199
-                    return cols;
200
-                });
201
-                return true;
202
-            }
203
-            if (e.getMessage().contains("Row length exceeds") || e.getMessage().contains("too many columns")) {
204
-                log.warn("⚠️ 跳过新增列 (超限): {}", columnName);
205
-                return false;
206
-            }
207
-            log.error("❌ 新增列失败: {}", e.getMessage());
208
-            return false;
209
-        } finally {
210
-            if (stmt != null) try { stmt.close(); } catch (SQLException ignored) {}
211
-            closeConnection(conn);
212
-        }
213
-    }
214
-
215
-//    private Map<String, String> filterEmptyFields(Map<String, Object> fieldValues) {
216
-//        Map<String, String> filtered = new HashMap<>();
217
-//        for (Map.Entry<String, Object> entry : fieldValues.entrySet()) {
218
-//            if (entry.getValue() != null) {
219
-//                String val = entry.getValue().toString().trim();
220
-//                if (!val.isEmpty()) {
221
-//                    filtered.put(entry.getKey(), escapeValue(val));
222
-//                }
223
-//            }
224
-//        }
225
-//        return filtered;
226
-//    }
227
-
228
-    // ==========================================
229 175
     // 初始化表结构
230 176
     // ==========================================
231 177
     private boolean initTableStructure(String dbName, String supertablename, String table, Set<String> fieldNames) {
@@ -238,20 +184,22 @@ public class TDengineService {
238 184
 
239 185
             stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName));
240 186
 
187
+            // 创建超级表:固定 ts + surfacename + ext_data(VARCHAR)
188
+            // 注:TDengine 2.x JSON 类型只能用于 TAGS,不能用于普通列
241 189
             String stableSql = String.format(
242
-                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location BINARY(64))",
190
+                    "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64), ext_data VARCHAR(8192)) TAGS (location BINARY(64))",
243 191
                     wrapName(dbName),
244 192
                     wrapName(supertablename)
245 193
             );
246 194
             stmt.executeUpdate(stableSql);
247 195
 
248
-            Set<String> existing = getStableColumns(dbName, supertablename);
249
-            for (String field : fieldNames) {
250
-                if ("ts".equalsIgnoreCase(field) || "surfacename".equalsIgnoreCase(field)) continue;
251
-                if (!existing.contains(field)) {
252
-                    addColumnToStable(dbName, supertablename, field);
253
-                }
254
-            }
196
+            // 更新缓存:固定列
197
+            String key = getStableKey(dbName, supertablename);
198
+            Set<String> fixedCols = new HashSet<>();
199
+            fixedCols.add("ts");
200
+            fixedCols.add("surfacename");
201
+            fixedCols.add("ext_data");
202
+            stableColumnCache.put(key, fixedCols);
255 203
 
256 204
             String tableSql = String.format(
257 205
                     "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
@@ -282,44 +230,94 @@ public class TDengineService {
282 230
 
283 231
         String supertablename = table.contains("_") ? table.substring(0, table.lastIndexOf('_')) : table;
284 232
 
285
-        Set<String> allFields = new HashSet<>();
286
-        for (Map<String, Object> data : dataList) {
287
-            if (data != null) {
288
-                for (String key : data.keySet()) {
289
-                    if (!"topic".equalsIgnoreCase(key)) allFields.add(key);
290
-                }
233
+        // 初始化表结构(固定为 ts + surfacename + ext_data)
234
+        initTableStructure(dbName, supertablename, table, Collections.emptySet());
235
+
236
+        // 分批处理,每批最多 50 条记录(避免 SQL 过长)
237
+        int batchSize = 50;
238
+        int totalBatches = (dataList.size() + batchSize - 1) / batchSize;
239
+
240
+        for (int i = 0; i < totalBatches; i++) {
241
+            int start = i * batchSize;
242
+            int end = Math.min(start + batchSize, dataList.size());
243
+            List<Map<String, Object>> batch = dataList.subList(start, end);
244
+
245
+            if (!insertBatchInternal(dbName, supertablename, table, batch)) {
246
+                return false;
291 247
             }
292 248
         }
293
-        allFields.remove("ts");
294
-        allFields.remove("surfacename");
295 249
 
296
-        initTableStructure(dbName, supertablename, table, allFields);
250
+        log.info("✅ 批量写入成功: {} | 条数: {}", table, dataList.size());
251
+        return true;
252
+    }
253
+
254
+    /**
255
+     * 内部方法:插入一批数据
256
+     */
257
+    private boolean insertBatchInternal(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) {
258
+        // 确保表存在(可能有竞态条件)
259
+        ensureTableExists(dbName, supertablename, table);
297 260
 
298 261
         StringBuilder sqlBuilder = new StringBuilder();
299 262
         sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
300
-                .append(" (ts, surfacename");
301
-
302
-        List<String> colList = new ArrayList<>();
303
-        for (String col : allFields) {
304
-            if (isValidFieldName(col)) {
305
-                colList.add(col);
306
-                sqlBuilder.append(", ").append(wrapName(col));
307
-            } else {
308
-                log.warn("跳过非法字段名: {}", col);
263
+                .append(" (ts, surfacename, ext_data) VALUES ");
264
+
265
+        boolean hasData = false;
266
+        for (Map<String, Object> data : dataList) {
267
+            if (data == null) continue;
268
+
269
+            // 将动态字段转为 JSON 并 GZIP 压缩
270
+            String extJson = buildDynamicJson(data);
271
+            String compressed = compressToBase64(extJson);
272
+
273
+            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
274
+                    .append(escapeValue(compressed)).append("'),");
275
+            hasData = true;
276
+        }
277
+
278
+        if (!hasData) return true;
279
+
280
+        sqlBuilder.setLength(sqlBuilder.length() - 1);
281
+        String finalSql = sqlBuilder.toString();
282
+
283
+        Connection conn = null;
284
+        Statement stmt = null;
285
+        try {
286
+            conn = getConnection();
287
+            stmt = conn.createStatement();
288
+            stmt.setQueryTimeout(30);
289
+            stmt.executeUpdate(finalSql);
290
+            return true;
291
+        } catch (SQLException e) {
292
+            // 表不存在时尝试重建表后重试
293
+            if (e.getMessage().contains("Table does not exist")) {
294
+                log.warn("⚠️ 表不存在,重建表: {}", table);
295
+                initTableStructure(dbName, supertablename, table, Collections.emptySet());
296
+                return insertBatchRetry(dbName, supertablename, table, dataList);
309 297
             }
298
+            log.error("❌ 批量写入 SQL 失败: {} | 错误: {}", table, e.getMessage());
299
+            return false;
300
+        } finally {
301
+            if (stmt != null) try { stmt.close(); } catch (SQLException ignored) {}
302
+            closeConnection(conn);
310 303
         }
311
-        sqlBuilder.append(") VALUES ");
304
+    }
305
+
306
+    /**
307
+     * 重试插入(表重建后)
308
+     */
309
+    private boolean insertBatchRetry(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) {
310
+        StringBuilder sqlBuilder = new StringBuilder();
311
+        sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
312
+                .append(" (ts, surfacename, ext_data) VALUES ");
312 313
 
313 314
         boolean hasData = false;
314 315
         for (Map<String, Object> data : dataList) {
315 316
             if (data == null) continue;
316
-            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("'");
317
-            for (String col : colList) {
318
-                Object v = data.get(col);
319
-                String val = (v == null) ? "" : escapeValue(v.toString());
320
-                sqlBuilder.append(", '").append(val).append("'");
321
-            }
322
-            sqlBuilder.append("),");
317
+            String extJson = buildDynamicJson(data);
318
+            String compressed = compressToBase64(extJson);
319
+            sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
320
+                    .append(escapeValue(compressed)).append("'),");
323 321
             hasData = true;
324 322
         }
325 323
 
@@ -335,17 +333,112 @@ public class TDengineService {
335 333
             stmt = conn.createStatement();
336 334
             stmt.setQueryTimeout(30);
337 335
             stmt.executeUpdate(finalSql);
338
-            log.info("✅ 批量写入成功: {} | 条数: {}", table, dataList.size());
339 336
             return true;
340 337
         } catch (SQLException e) {
341
-            log.error("❌ 批量写入 SQL 失败: {} | 错误: {}", table, e.getMessage());
342
-            throw e;
338
+            log.error("❌ 重试插入失败: {} | 错误: {}", table, e.getMessage());
339
+            return false;
343 340
         } finally {
344 341
             if (stmt != null) try { stmt.close(); } catch (SQLException ignored) {}
345 342
             closeConnection(conn);
346 343
         }
347 344
     }
348 345
 
346
+    /**
347
+     * 确保表存在(检查+创建)
348
+     */
349
+    private void ensureTableExists(String dbName, String supertablename, String table) {
350
+        Connection conn = null;
351
+        Statement stmt = null;
352
+        try {
353
+            conn = getConnection();
354
+            stmt = conn.createStatement();
355
+            stmt.setQueryTimeout(5);
356
+
357
+            // 检查超级表是否存在(使用 information_schema)
358
+            String checkStableSql = String.format(
359
+                    "SELECT * FROM information_schema.ins_tables WHERE table_name = '%s' AND db_name = '%s'",
360
+                    escapeValue(supertablename), escapeValue(dbName));
361
+            ResultSet rs = stmt.executeQuery(checkStableSql);
362
+            boolean stableExists = rs.next();
363
+            rs.close();
364
+
365
+            if (!stableExists) {
366
+                log.info("超级表不存在,创建: {}.{}", dbName, supertablename);
367
+                initTableStructure(dbName, supertablename, table, Collections.emptySet());
368
+                return;
369
+            }
370
+
371
+            // 检查子表是否存在
372
+            String checkTableSql = String.format(
373
+                    "SELECT * FROM information_schema.ins_tables WHERE table_name = '%s' AND db_name = '%s' AND table_type = 'CHILD_TABLE'",
374
+                    escapeValue(table), escapeValue(dbName));
375
+            rs = stmt.executeQuery(checkTableSql);
376
+            boolean tableExists = rs.next();
377
+            rs.close();
378
+
379
+            if (!tableExists) {
380
+                String tableSql = String.format(
381
+                        "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
382
+                        wrapName(dbName),
383
+                        wrapName(table),
384
+                        wrapName(dbName),
385
+                        wrapName(supertablename),
386
+                        escapeValue(supertablename)
387
+                );
388
+                stmt.executeUpdate(tableSql);
389
+                log.info("✅ 子表创建成功: {}", table);
390
+            }
391
+        } catch (SQLException e) {
392
+            log.warn("检查表存在性失败,继续尝试插入: {}", e.getMessage());
393
+        } finally {
394
+            if (stmt != null) try { stmt.close(); } catch (SQLException ignored) {}
395
+            closeConnection(conn);
396
+        }
397
+    }
398
+
399
+    /**
400
+     * 将动态字段构建为 JSON 字符串(排除 topic、ts、surfacename)
401
+     */
402
+    private String buildDynamicJson(Map<String, Object> data) {
403
+        if (data == null) return "{}";
404
+        Map<String, Object> dynamic = new LinkedHashMap<>();
405
+        for (Map.Entry<String, Object> entry : data.entrySet()) {
406
+            String key = entry.getKey();
407
+            if ("topic".equalsIgnoreCase(key) || "ts".equalsIgnoreCase(key)
408
+                    || "surfacename".equalsIgnoreCase(key)) {
409
+                continue;
410
+            }
411
+            Object value = entry.getValue();
412
+            if (value != null && !value.toString().trim().isEmpty()) {
413
+                dynamic.put(key, value);
414
+            }
415
+        }
416
+        try {
417
+            return objectMapper.writeValueAsString(dynamic);
418
+        } catch (Exception e) {
419
+            log.warn("JSON 序列化失败,返回空对象: {}", e.getMessage());
420
+            return "{}";
421
+        }
422
+    }
423
+
424
+    /**
425
+     * GZIP 压缩字符串
426
+     */
427
+    private String compressToBase64(String data) {
428
+        if (data == null || data.isEmpty()) return "";
429
+        try {
430
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
431
+            GZIPOutputStream gzip = new GZIPOutputStream(bos);
432
+            gzip.write(data.getBytes("UTF-8"));
433
+            gzip.close();
434
+            byte[] compressed = Base64.getEncoder().encode(bos.toByteArray());
435
+            return new String(compressed);
436
+        } catch (Exception e) {
437
+            log.warn("GZIP 压缩失败,使用原始数据: {}", e.getMessage());
438
+            return data;
439
+        }
440
+    }
441
+
349 442
     @Deprecated
350 443
     public boolean addToBatch(String dbName, String table, Map<String, Object> dataMap) {
351 444
         List<Map<String, Object>> list = new ArrayList<>();

+ 9
- 0
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Просмотреть файл

@@ -178,6 +178,15 @@ public class VehicleSyncTask {
178 178
                     continue;
179 179
                 }
180 180
                 String controllerId = parts[1];
181
+                try {
182
+                    sysrealtimeService.createrealtime(controllerId);
183
+                } catch (Exception e) {
184
+                    Throwable cause = e;
185
+                    while (cause.getCause() != null) cause = cause.getCause();
186
+                    log.error("创建表失败: {} | {} | {}", controllerId, e.getMessage(), cause.getMessage());
187
+                    e.printStackTrace();
188
+                    continue;
189
+                }
181 190
                 String createTime = getStringValue(dataMap, "createTime");
182 191
                 String timestamp = getStringValue(dataMap, "timestamp");
183 192
                 String deviceId = getStringValue(dataMap, "device_id");

+ 1
- 0
iot-platform/src/main/resources/application.yml Просмотреть файл

@@ -14,6 +14,7 @@ server:
14 14
 logging:
15 15
   level:
16 16
     com.iot.platform: debug
17
+    com.iot.platform.mapper: trace
17 18
     org.springframework: warn
18 19
 
19 20
 # Spring配置

+ 5
- 7
iot-platform/src/main/resources/mapper/SysrealtimeMapper.xml Просмотреть файл

@@ -18,12 +18,10 @@
18 18
             timestamp VARCHAR(255) NOT NULL COMMENT '时间戳',
19 19
             k VARCHAR(255) NOT NULL COMMENT 'key',
20 20
             v VARCHAR(255) NOT NULL COMMENT '值',
21
-            -- 单独索引
22
-            INDEX device_id (device_id),
23
-            -- 联合索引(适合同时根据device_id和create_time查询的场景)
24
-            INDEX device_id_create_time (device_id, create_time)
25
-            INDEX k (k)
26
-            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='实时数据信息';
21
+            INDEX idx_device_id (device_id),
22
+            INDEX idx_device_create_time (device_id, create_time),
23
+            INDEX idx_k (k)
24
+        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='实时数据信息';
27 25
     </update>
28 26
 
29 27
     <select id="selecttables" resultType="String">
@@ -39,4 +37,4 @@
39 37
     <select id="selectkey" resultType="Integer">
40 38
         select COUNT(*) from `${tableName}` where k=#{k}
41 39
     </select>
42
-</mapper>
40
+</mapper>

Загрузка…
Отмена
Сохранить