|
|
@@ -9,13 +9,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|
9
|
9
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
10
|
10
|
import org.springframework.stereotype.Service;
|
|
11
|
11
|
|
|
12
|
|
-import java.io.ByteArrayOutputStream;
|
|
13
|
12
|
import java.sql.*;
|
|
14
|
13
|
import java.util.*;
|
|
15
|
14
|
import java.util.concurrent.*;
|
|
16
|
|
-import java.util.zip.*;
|
|
17
|
|
-
|
|
18
|
|
-import com.fasterxml.jackson.databind.ObjectMapper;
|
|
19
|
15
|
|
|
20
|
16
|
@Service("tdengineService")
|
|
21
|
17
|
public class TDengineService {
|
|
|
@@ -34,15 +30,15 @@ public class TDengineService {
|
|
34
|
30
|
private HikariDataSource dataSource;
|
|
35
|
31
|
private volatile boolean dataSourceInitialized = false;
|
|
36
|
32
|
|
|
37
|
|
- // === 新增:缓存超级表结构 (key = dbName.stableName) ===
|
|
|
33
|
+ // === 缓存超级表结构 (key = dbName.stableName) ===
|
|
38
|
34
|
private final Map<String, Set<String>> stableColumnCache = new ConcurrentHashMap<>();
|
|
39
|
35
|
private static final int MAX_CACHE_SIZE = 1000;
|
|
40
|
36
|
|
|
41
|
|
- // JSON 列名,用于存储所有动态字段
|
|
42
|
|
- private static final String JSON_COLUMN_NAME = "ext_data";
|
|
|
37
|
+ // 允许的列名字符
|
|
|
38
|
+ private static final String ALLOWED_COLUMNS = "^[a-zA-Z_][a-zA-Z0-9_]*$";
|
|
43
|
39
|
|
|
44
|
|
- // ObjectMapper 线程安全,可复用
|
|
45
|
|
- private static final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
40
|
+ // TDengine 超级表最大列数限制
|
|
|
41
|
+ private static final int MAX_COLUMNS_PER_STABLE = 4096;
|
|
46
|
42
|
|
|
47
|
43
|
private synchronized void initDataSource() {
|
|
48
|
44
|
if (dataSourceInitialized) {
|
|
|
@@ -72,7 +68,7 @@ public class TDengineService {
|
|
72
|
68
|
}
|
|
73
|
69
|
|
|
74
|
70
|
/**
|
|
75
|
|
- * 手动获取连接
|
|
|
71
|
+ * 获取连接
|
|
76
|
72
|
*/
|
|
77
|
73
|
public Connection getConnection() throws SQLException {
|
|
78
|
74
|
if (!dataSourceInitialized) {
|
|
|
@@ -86,7 +82,7 @@ public class TDengineService {
|
|
86
|
82
|
}
|
|
87
|
83
|
|
|
88
|
84
|
/**
|
|
89
|
|
- * 手动关闭连接
|
|
|
85
|
+ * 关闭连接
|
|
90
|
86
|
*/
|
|
91
|
87
|
public void closeConnection(Connection conn) {
|
|
92
|
88
|
if (conn != null) {
|
|
|
@@ -111,7 +107,7 @@ public class TDengineService {
|
|
111
|
107
|
}
|
|
112
|
108
|
|
|
113
|
109
|
private boolean isValidFieldName(String name) {
|
|
114
|
|
- return name != null && name.matches("^[a-zA-Z0-9_]+$");
|
|
|
110
|
+ return name != null && name.matches(ALLOWED_COLUMNS);
|
|
115
|
111
|
}
|
|
116
|
112
|
|
|
117
|
113
|
// === 缓存工具方法 ===
|
|
|
@@ -129,15 +125,12 @@ public class TDengineService {
|
|
129
|
125
|
return cached;
|
|
130
|
126
|
}
|
|
131
|
127
|
|
|
132
|
|
- // 缓存未命中,查 DB
|
|
133
|
128
|
Set<String> columns = loadStableColumnsFromDB(dbName, stableName);
|
|
134
|
|
- if (!columns.isEmpty()) {
|
|
135
|
|
- if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
|
|
136
|
|
- stableColumnCache.clear();
|
|
137
|
|
- log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
|
|
138
|
|
- }
|
|
139
|
|
- stableColumnCache.put(key, columns);
|
|
|
129
|
+ if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
|
|
|
130
|
+ stableColumnCache.clear();
|
|
|
131
|
+ log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
|
|
140
|
132
|
}
|
|
|
133
|
+ stableColumnCache.put(key, columns);
|
|
141
|
134
|
return columns;
|
|
142
|
135
|
}
|
|
143
|
136
|
|
|
|
@@ -151,7 +144,7 @@ public class TDengineService {
|
|
151
|
144
|
try {
|
|
152
|
145
|
conn = getConnection();
|
|
153
|
146
|
stmt = conn.createStatement();
|
|
154
|
|
- stmt.setQueryTimeout(5); // ⭐ 防止无限等待
|
|
|
147
|
+ stmt.setQueryTimeout(5);
|
|
155
|
148
|
rs = stmt.executeQuery(sql);
|
|
156
|
149
|
while (rs.next()) {
|
|
157
|
150
|
columns.add(rs.getString(1));
|
|
|
@@ -169,7 +162,7 @@ public class TDengineService {
|
|
169
|
162
|
}
|
|
170
|
163
|
|
|
171
|
164
|
// ==========================================
|
|
172
|
|
- // 初始化表结构
|
|
|
165
|
+ // 初始化表结构(按列存储,无 ext_data)
|
|
173
|
166
|
// ==========================================
|
|
174
|
167
|
private void initTableStructure(String dbName, String supertablename, String table, Set<String> fieldNames) throws SQLException {
|
|
175
|
168
|
Connection conn = null;
|
|
|
@@ -181,10 +174,9 @@ public class TDengineService {
|
|
181
|
174
|
|
|
182
|
175
|
stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS " + wrapName(dbName));
|
|
183
|
176
|
|
|
184
|
|
- // 创建超级表:固定 ts + surfacename + ext_data(VARCHAR)
|
|
185
|
|
- // 注:TDengine 2.x JSON 类型只能用于 TAGS,不能用于普通列
|
|
|
177
|
+ // 创建超级表:固定 ts + surfacename,无 ext_data 列
|
|
186
|
178
|
String stableSql = String.format(
|
|
187
|
|
- "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64), ext_data VARCHAR(8192)) TAGS (location BINARY(64))",
|
|
|
179
|
+ "CREATE STABLE IF NOT EXISTS %s.%s (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location BINARY(64))",
|
|
188
|
180
|
wrapName(dbName),
|
|
189
|
181
|
wrapName(supertablename)
|
|
190
|
182
|
);
|
|
|
@@ -192,16 +184,12 @@ public class TDengineService {
|
|
192
|
184
|
|
|
193
|
185
|
// 更新缓存:固定列
|
|
194
|
186
|
String key = getStableKey(dbName, supertablename);
|
|
195
|
|
- if (stableColumnCache.size() >= MAX_CACHE_SIZE) {
|
|
196
|
|
- stableColumnCache.clear();
|
|
197
|
|
- log.warn("TDengine 超级表缓存已达上限({}),已清空", MAX_CACHE_SIZE);
|
|
198
|
|
- }
|
|
199
|
187
|
Set<String> fixedCols = new HashSet<>();
|
|
200
|
188
|
fixedCols.add("ts");
|
|
201
|
189
|
fixedCols.add("surfacename");
|
|
202
|
|
- fixedCols.add("ext_data");
|
|
203
|
190
|
stableColumnCache.put(key, fixedCols);
|
|
204
|
191
|
|
|
|
192
|
+ // 创建子表
|
|
205
|
193
|
String tableSql = String.format(
|
|
206
|
194
|
"CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS ('%s')",
|
|
207
|
195
|
wrapName(dbName),
|
|
|
@@ -219,17 +207,17 @@ public class TDengineService {
|
|
219
|
207
|
}
|
|
220
|
208
|
|
|
221
|
209
|
// ==========================================
|
|
222
|
|
- // 批量插入
|
|
|
210
|
+ // 批量插入(按列存储)
|
|
223
|
211
|
// ==========================================
|
|
224
|
212
|
public void insertBatch(String dbName, String table, List<Map<String, Object>> dataList) throws SQLException {
|
|
225
|
213
|
if (dataList == null || dataList.isEmpty()) return;
|
|
226
|
214
|
|
|
227
|
215
|
String supertablename = table.contains("_") ? table.substring(0, table.lastIndexOf('_')) : table;
|
|
228
|
216
|
|
|
229
|
|
- // 初始化表结构(固定为 ts + surfacename + ext_data)
|
|
|
217
|
+ // 初始化表结构
|
|
230
|
218
|
initTableStructure(dbName, supertablename, table, Collections.emptySet());
|
|
231
|
219
|
|
|
232
|
|
- // 分批处理,每批最多 50 条记录(避免 SQL 过长)
|
|
|
220
|
+ // 分批处理,每批最多 50 条记录
|
|
233
|
221
|
int batchSize = 50;
|
|
234
|
222
|
int totalBatches = (dataList.size() + batchSize - 1) / batchSize;
|
|
235
|
223
|
|
|
|
@@ -248,23 +236,65 @@ public class TDengineService {
|
|
248
|
236
|
* 内部方法:插入一批数据
|
|
249
|
237
|
*/
|
|
250
|
238
|
private void insertBatchInternal(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) throws SQLException {
|
|
251
|
|
- // 确保表存在(可能有竞态条件)
|
|
252
|
239
|
ensureTableExists(dbName, supertablename, table);
|
|
253
|
240
|
|
|
|
241
|
+ // 收集所有动态列名及其数据类型
|
|
|
242
|
+ Map<String, String> columnTypes = new LinkedHashMap<>();
|
|
|
243
|
+ for (Map<String, Object> data : dataList) {
|
|
|
244
|
+ if (data == null) continue;
|
|
|
245
|
+ for (Map.Entry<String, Object> entry : data.entrySet()) {
|
|
|
246
|
+ String key = entry.getKey();
|
|
|
247
|
+ if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
|
|
|
248
|
+ columnTypes.put(key, getValueType(entry.getValue()));
|
|
|
249
|
+ }
|
|
|
250
|
+ }
|
|
|
251
|
+ }
|
|
|
252
|
+
|
|
|
253
|
+ // 获取当前列数
|
|
|
254
|
+ Set<String> existingColumns = getStableColumns(dbName, supertablename);
|
|
|
255
|
+ int totalColumns = existingColumns.size() + columnTypes.size();
|
|
|
256
|
+
|
|
|
257
|
+ // 检查列数是否超限
|
|
|
258
|
+ if (totalColumns > MAX_COLUMNS_PER_STABLE) {
|
|
|
259
|
+ log.error("超级表总列数({})超过限制({}),跳过本次插入 | 表: {}.{}",
|
|
|
260
|
+ totalColumns, MAX_COLUMNS_PER_STABLE, dbName, table);
|
|
|
261
|
+ return;
|
|
|
262
|
+ }
|
|
|
263
|
+
|
|
|
264
|
+ // 确保所有列都存在(自动 ALTER 添加新列)
|
|
|
265
|
+ ensureColumnsExist(dbName, supertablename, columnTypes);
|
|
|
266
|
+
|
|
|
267
|
+ // 重新获取列数以确保一致性
|
|
|
268
|
+ existingColumns = getStableColumns(dbName, supertablename);
|
|
|
269
|
+ if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
|
|
|
270
|
+ log.error("超级表列数已达上限({}),停止插入数据 | 表: {}.{}",
|
|
|
271
|
+ MAX_COLUMNS_PER_STABLE, dbName, table);
|
|
|
272
|
+ return;
|
|
|
273
|
+ }
|
|
|
274
|
+
|
|
|
275
|
+ // 构建 SQL
|
|
254
|
276
|
StringBuilder sqlBuilder = new StringBuilder();
|
|
255
|
277
|
sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
|
|
256
|
|
- .append(" (ts, surfacename, ext_data) VALUES ");
|
|
|
278
|
+ .append(" (ts, surfacename");
|
|
|
279
|
+
|
|
|
280
|
+ for (String col : columnTypes.keySet()) {
|
|
|
281
|
+ sqlBuilder.append(", ").append(wrapName(col));
|
|
|
282
|
+ }
|
|
|
283
|
+ sqlBuilder.append(") VALUES ");
|
|
257
|
284
|
|
|
258
|
285
|
boolean hasData = false;
|
|
259
|
286
|
for (Map<String, Object> data : dataList) {
|
|
260
|
287
|
if (data == null) continue;
|
|
261
|
288
|
|
|
262
|
|
- // 将动态字段转为 JSON 并 GZIP 压缩
|
|
263
|
|
- String extJson = buildDynamicJson(data);
|
|
264
|
|
- String compressed = compressToBase64(extJson);
|
|
|
289
|
+ sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("'");
|
|
265
|
290
|
|
|
266
|
|
- sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
|
|
267
|
|
- .append(escapeValue(compressed)).append("'),");
|
|
|
291
|
+ for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
|
|
|
292
|
+ String col = entry.getKey();
|
|
|
293
|
+ String colType = entry.getValue();
|
|
|
294
|
+ Object value = data.get(col);
|
|
|
295
|
+ sqlBuilder.append(", ").append(formatValue(value, col, colType));
|
|
|
296
|
+ }
|
|
|
297
|
+ sqlBuilder.append("),");
|
|
268
|
298
|
hasData = true;
|
|
269
|
299
|
}
|
|
270
|
300
|
|
|
|
@@ -281,7 +311,6 @@ public class TDengineService {
|
|
281
|
311
|
stmt.setQueryTimeout(30);
|
|
282
|
312
|
stmt.executeUpdate(finalSql);
|
|
283
|
313
|
} catch (SQLException e) {
|
|
284
|
|
- // 表不存在时尝试重建表后重试
|
|
285
|
314
|
if (e.getMessage().contains("Table does not exist")) {
|
|
286
|
315
|
log.warn("表不存在,重建表: {}", table);
|
|
287
|
316
|
initTableStructure(dbName, supertablename, table, Collections.emptySet());
|
|
|
@@ -296,20 +325,44 @@ public class TDengineService {
|
|
296
|
325
|
}
|
|
297
|
326
|
|
|
298
|
327
|
/**
|
|
299
|
|
- * 重试插入(表重建后)
|
|
|
328
|
+ * 重试插入
|
|
300
|
329
|
*/
|
|
301
|
330
|
private void insertBatchRetry(String dbName, String supertablename, String table, List<Map<String, Object>> dataList) throws SQLException {
|
|
|
331
|
+ Map<String, String> columnTypes = new LinkedHashMap<>();
|
|
|
332
|
+ for (Map<String, Object> data : dataList) {
|
|
|
333
|
+ if (data == null) continue;
|
|
|
334
|
+ for (Map.Entry<String, Object> entry : data.entrySet()) {
|
|
|
335
|
+ String key = entry.getKey();
|
|
|
336
|
+ if (isValidFieldName(key) && !isReservedColumn(key) && !columnTypes.containsKey(key)) {
|
|
|
337
|
+ columnTypes.put(key, getValueType(entry.getValue()));
|
|
|
338
|
+ }
|
|
|
339
|
+ }
|
|
|
340
|
+ }
|
|
|
341
|
+
|
|
|
342
|
+ ensureColumnsExist(dbName, supertablename, columnTypes);
|
|
|
343
|
+
|
|
302
|
344
|
StringBuilder sqlBuilder = new StringBuilder();
|
|
303
|
345
|
sqlBuilder.append("INSERT INTO ").append(wrapName(dbName)).append(".").append(wrapName(table))
|
|
304
|
|
- .append(" (ts, surfacename, ext_data) VALUES ");
|
|
|
346
|
+ .append(" (ts, surfacename");
|
|
|
347
|
+
|
|
|
348
|
+ for (String col : columnTypes.keySet()) {
|
|
|
349
|
+ sqlBuilder.append(", ").append(wrapName(col));
|
|
|
350
|
+ }
|
|
|
351
|
+ sqlBuilder.append(") VALUES ");
|
|
305
|
352
|
|
|
306
|
353
|
boolean hasData = false;
|
|
307
|
354
|
for (Map<String, Object> data : dataList) {
|
|
308
|
355
|
if (data == null) continue;
|
|
309
|
|
- String extJson = buildDynamicJson(data);
|
|
310
|
|
- String compressed = compressToBase64(extJson);
|
|
311
|
|
- sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("', '")
|
|
312
|
|
- .append(escapeValue(compressed)).append("'),");
|
|
|
356
|
+
|
|
|
357
|
+ sqlBuilder.append("(NOW(), '").append(escapeValue(supertablename)).append("'");
|
|
|
358
|
+
|
|
|
359
|
+ for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
|
|
|
360
|
+ String col = entry.getKey();
|
|
|
361
|
+ String colType = entry.getValue();
|
|
|
362
|
+ Object value = data.get(col);
|
|
|
363
|
+ sqlBuilder.append(", ").append(formatValue(value, col, colType));
|
|
|
364
|
+ }
|
|
|
365
|
+ sqlBuilder.append("),");
|
|
313
|
366
|
hasData = true;
|
|
314
|
367
|
}
|
|
315
|
368
|
|
|
|
@@ -334,7 +387,166 @@ public class TDengineService {
|
|
334
|
387
|
}
|
|
335
|
388
|
|
|
336
|
389
|
/**
|
|
337
|
|
- * 确保表存在(检查+创建)
|
|
|
390
|
+ * 判断是否为保留列(ts, surfacename)
|
|
|
391
|
+ */
|
|
|
392
|
+ private boolean isReservedColumn(String columnName) {
|
|
|
393
|
+ return "ts".equalsIgnoreCase(columnName) || "surfacename".equalsIgnoreCase(columnName);
|
|
|
394
|
+ }
|
|
|
395
|
+
|
|
|
396
|
+ /**
|
|
|
397
|
+ * 判断值类型,返回 TDengine 对应的数据类型
|
|
|
398
|
+ */
|
|
|
399
|
+ private String getValueType(Object value) {
|
|
|
400
|
+ if (value == null) {
|
|
|
401
|
+ return "VARCHAR";
|
|
|
402
|
+ }
|
|
|
403
|
+ if (value instanceof Boolean) {
|
|
|
404
|
+ return "BOOL";
|
|
|
405
|
+ }
|
|
|
406
|
+ if (value instanceof Integer || value instanceof Long) {
|
|
|
407
|
+ return "BIGINT";
|
|
|
408
|
+ }
|
|
|
409
|
+ if (value instanceof Float || value instanceof Double) {
|
|
|
410
|
+ return "DOUBLE";
|
|
|
411
|
+ }
|
|
|
412
|
+ // 时间类型(Date, Timestamp, LocalDateTime 等)
|
|
|
413
|
+ if (value instanceof java.util.Date || value instanceof java.sql.Timestamp
|
|
|
414
|
+ || value instanceof java.time.LocalDateTime || value instanceof java.time.Instant) {
|
|
|
415
|
+ return "TIMESTAMP";
|
|
|
416
|
+ }
|
|
|
417
|
+ return "VARCHAR";
|
|
|
418
|
+ }
|
|
|
419
|
+
|
|
|
420
|
+ /**
|
|
|
421
|
+ * 根据 TDengine 列类型获取创建列的 SQL 类型
|
|
|
422
|
+ */
|
|
|
423
|
+ private String getColumnTypeForDDL(String tdType, String columnName) {
|
|
|
424
|
+ // 特殊列名使用较长长度
|
|
|
425
|
+ if ("device_id".equalsIgnoreCase(columnName)) {
|
|
|
426
|
+ return "VARCHAR(100)";
|
|
|
427
|
+ }
|
|
|
428
|
+ switch (tdType) {
|
|
|
429
|
+ case "BOOL":
|
|
|
430
|
+ case "BIGINT":
|
|
|
431
|
+ case "DOUBLE":
|
|
|
432
|
+ case "TIMESTAMP":
|
|
|
433
|
+ return tdType;
|
|
|
434
|
+ case "VARCHAR":
|
|
|
435
|
+ default:
|
|
|
436
|
+ return "VARCHAR(38)";
|
|
|
437
|
+ }
|
|
|
438
|
+ }
|
|
|
439
|
+
|
|
|
440
|
+ /**
|
|
|
441
|
+ * 格式化值
|
|
|
442
|
+ */
|
|
|
443
|
+ private String formatValue(Object value, String columnName, String columnType) {
|
|
|
444
|
+ if (value == null) {
|
|
|
445
|
+ return "NULL";
|
|
|
446
|
+ }
|
|
|
447
|
+ String strValue = value.toString();
|
|
|
448
|
+
|
|
|
449
|
+ // 数字类型不做长度校验
|
|
|
450
|
+ if ("DOUBLE".equals(columnType) || "BIGINT".equals(columnType) || "BOOL".equals(columnType)) {
|
|
|
451
|
+ if (value instanceof Boolean) {
|
|
|
452
|
+ return (Boolean) value ? "1" : "0";
|
|
|
453
|
+ }
|
|
|
454
|
+ return strValue;
|
|
|
455
|
+ }
|
|
|
456
|
+
|
|
|
457
|
+ // TIMESTAMP 类型处理
|
|
|
458
|
+ if ("TIMESTAMP".equals(columnType)) {
|
|
|
459
|
+ if (value instanceof java.util.Date) {
|
|
|
460
|
+ return String.valueOf(((java.util.Date) value).getTime());
|
|
|
461
|
+ }
|
|
|
462
|
+ if (value instanceof java.sql.Timestamp) {
|
|
|
463
|
+ return String.valueOf(((java.sql.Timestamp) value).getTime());
|
|
|
464
|
+ }
|
|
|
465
|
+ if (value instanceof java.time.LocalDateTime) {
|
|
|
466
|
+ return String.valueOf(((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.of("+8")).toEpochMilli());
|
|
|
467
|
+ }
|
|
|
468
|
+ if (value instanceof java.time.Instant) {
|
|
|
469
|
+ return String.valueOf(((java.time.Instant) value).toEpochMilli());
|
|
|
470
|
+ }
|
|
|
471
|
+ }
|
|
|
472
|
+
|
|
|
473
|
+ // 字符串类型校验长度
|
|
|
474
|
+ int maxLen = "device_id".equalsIgnoreCase(columnName) ? 100 : 38;
|
|
|
475
|
+ if (strValue.length() > maxLen) {
|
|
|
476
|
+ log.warn("字段值超长,跳过存储 | 列: {} | 值长度: {} | 最大: {} | 前{}字符: {}",
|
|
|
477
|
+ columnName, strValue.length(), maxLen, maxLen, strValue.substring(0, maxLen));
|
|
|
478
|
+ return "NULL";
|
|
|
479
|
+ }
|
|
|
480
|
+ return "'" + escapeValue(strValue) + "'";
|
|
|
481
|
+ }
|
|
|
482
|
+
|
|
|
483
|
+ /**
|
|
|
484
|
+ * 确保列存在,如有新列则 ALTER 添加(根据类型添加对应列)
|
|
|
485
|
+ */
|
|
|
486
|
+ private void ensureColumnsExist(String dbName, String supertablename, Map<String, String> columnTypes) throws SQLException {
|
|
|
487
|
+ Set<String> existingColumns = getStableColumns(dbName, supertablename);
|
|
|
488
|
+
|
|
|
489
|
+ // 检查是否已达到列数上限
|
|
|
490
|
+ if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
|
|
|
491
|
+ log.error("超级表列数已达上限({}),停止添加新列 | 表: {}.{}", MAX_COLUMNS_PER_STABLE, dbName, supertablename);
|
|
|
492
|
+ return;
|
|
|
493
|
+ }
|
|
|
494
|
+
|
|
|
495
|
+ Connection conn = null;
|
|
|
496
|
+ Statement stmt = null;
|
|
|
497
|
+ try {
|
|
|
498
|
+ conn = getConnection();
|
|
|
499
|
+ stmt = conn.createStatement();
|
|
|
500
|
+ stmt.setQueryTimeout(10);
|
|
|
501
|
+
|
|
|
502
|
+ for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
|
|
|
503
|
+ String col = entry.getKey();
|
|
|
504
|
+ String colType = entry.getValue();
|
|
|
505
|
+
|
|
|
506
|
+ // 再次检查列数上限
|
|
|
507
|
+ if (existingColumns.size() >= MAX_COLUMNS_PER_STABLE) {
|
|
|
508
|
+ log.error("超级表列数已达上限({}),跳过添加列 | 列: {} | 表: {}.{}",
|
|
|
509
|
+ MAX_COLUMNS_PER_STABLE, col, dbName, supertablename);
|
|
|
510
|
+ continue;
|
|
|
511
|
+ }
|
|
|
512
|
+
|
|
|
513
|
+ if (!existingColumns.contains(col)) {
|
|
|
514
|
+ String ddlType = getColumnTypeForDDL(colType, col);
|
|
|
515
|
+ String alterSql = String.format(
|
|
|
516
|
+ "ALTER STABLE %s.%s ADD COLUMN %s %s",
|
|
|
517
|
+ wrapName(dbName),
|
|
|
518
|
+ wrapName(supertablename),
|
|
|
519
|
+ wrapName(col),
|
|
|
520
|
+ ddlType
|
|
|
521
|
+ );
|
|
|
522
|
+ try {
|
|
|
523
|
+ stmt.executeUpdate(alterSql);
|
|
|
524
|
+ log.info("超级表添加列: {}.{} -> {} ({})", dbName, supertablename, col, ddlType);
|
|
|
525
|
+ existingColumns.add(col);
|
|
|
526
|
+ } catch (SQLException e) {
|
|
|
527
|
+ if (e.getMessage().contains("already exists") || e.getMessage().contains("duplicate")) {
|
|
|
528
|
+ existingColumns.add(col);
|
|
|
529
|
+ } else if (e.getMessage().contains("too many columns")) {
|
|
|
530
|
+ log.error("超级表列数超限,无法添加列 | 列: {} | 表: {}.{} | 错误: {}",
|
|
|
531
|
+ col, dbName, supertablename, e.getMessage());
|
|
|
532
|
+ } else {
|
|
|
533
|
+ throw e;
|
|
|
534
|
+ }
|
|
|
535
|
+ }
|
|
|
536
|
+
|
|
|
537
|
+ // 更新缓存
|
|
|
538
|
+ String key = getStableKey(dbName, supertablename);
|
|
|
539
|
+ stableColumnCache.put(key, existingColumns);
|
|
|
540
|
+ }
|
|
|
541
|
+ }
|
|
|
542
|
+ } finally {
|
|
|
543
|
+ if (stmt != null) try { stmt.close(); } catch (SQLException ignored) {}
|
|
|
544
|
+ closeConnection(conn);
|
|
|
545
|
+ }
|
|
|
546
|
+ }
|
|
|
547
|
+
|
|
|
548
|
+ /**
|
|
|
549
|
+ * 确保表存在
|
|
338
|
550
|
*/
|
|
339
|
551
|
private void ensureTableExists(String dbName, String supertablename, String table) throws SQLException {
|
|
340
|
552
|
Connection conn = null;
|
|
|
@@ -345,7 +557,7 @@ public class TDengineService {
|
|
345
|
557
|
stmt = conn.createStatement();
|
|
346
|
558
|
stmt.setQueryTimeout(5);
|
|
347
|
559
|
|
|
348
|
|
- // 检查超级表是否存在(使用 information_schema)
|
|
|
560
|
+ // 检查超级表是否存在
|
|
349
|
561
|
String checkStableSql = String.format(
|
|
350
|
562
|
"SELECT * FROM information_schema.ins_tables WHERE table_name = '%s' AND db_name = '%s'",
|
|
351
|
563
|
escapeValue(supertablename), escapeValue(dbName));
|
|
|
@@ -390,47 +602,6 @@ public class TDengineService {
|
|
390
|
602
|
}
|
|
391
|
603
|
}
|
|
392
|
604
|
|
|
393
|
|
- /**
|
|
394
|
|
- * 将动态字段构建为 JSON 字符串(排除 topic、ts、surfacename)
|
|
395
|
|
- */
|
|
396
|
|
- private String buildDynamicJson(Map<String, Object> data) {
|
|
397
|
|
- if (data == null) return "{}";
|
|
398
|
|
- Map<String, Object> dynamic = new LinkedHashMap<>();
|
|
399
|
|
- for (Map.Entry<String, Object> entry : data.entrySet()) {
|
|
400
|
|
- String key = entry.getKey();
|
|
401
|
|
- if ("topic".equalsIgnoreCase(key) || "ts".equalsIgnoreCase(key)
|
|
402
|
|
- || "surfacename".equalsIgnoreCase(key)) {
|
|
403
|
|
- continue;
|
|
404
|
|
- }
|
|
405
|
|
- Object value = entry.getValue();
|
|
406
|
|
- if (value != null && !value.toString().trim().isEmpty()) {
|
|
407
|
|
- dynamic.put(key, value);
|
|
408
|
|
- }
|
|
409
|
|
- }
|
|
410
|
|
- try {
|
|
411
|
|
- return objectMapper.writeValueAsString(dynamic);
|
|
412
|
|
- } catch (Exception e) {
|
|
413
|
|
- throw new IllegalStateException("JSON 序列化失败,数据可能丢失: " + dynamic, e);
|
|
414
|
|
- }
|
|
415
|
|
- }
|
|
416
|
|
-
|
|
417
|
|
- /**
|
|
418
|
|
- * GZIP 压缩字符串
|
|
419
|
|
- */
|
|
420
|
|
- private String compressToBase64(String data) {
|
|
421
|
|
- if (data == null || data.isEmpty()) return "";
|
|
422
|
|
- try {
|
|
423
|
|
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
|
424
|
|
- try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
|
|
425
|
|
- gzip.write(data.getBytes("UTF-8"));
|
|
426
|
|
- }
|
|
427
|
|
- byte[] compressed = Base64.getEncoder().encode(bos.toByteArray());
|
|
428
|
|
- return new String(compressed);
|
|
429
|
|
- } catch (Exception e) {
|
|
430
|
|
- throw new IllegalStateException("GZIP 压缩失败: " + e.getMessage(), e);
|
|
431
|
|
- }
|
|
432
|
|
- }
|
|
433
|
|
-
|
|
434
|
605
|
@Deprecated
|
|
435
|
606
|
public void addToBatch(String dbName, String table, Map<String, Object> dataMap) throws SQLException {
|
|
436
|
607
|
List<Map<String, Object>> list = new ArrayList<>();
|
|
|
@@ -438,9 +609,9 @@ public class TDengineService {
|
|
438
|
609
|
insertBatch(dbName, table, list);
|
|
439
|
610
|
}
|
|
440
|
611
|
|
|
441
|
|
- // ==========================================
|
|
442
|
|
- // 可选:清除缓存(用于运维或测试)
|
|
443
|
|
- // ==========================================
|
|
|
612
|
+ /**
|
|
|
613
|
+ * 清除缓存
|
|
|
614
|
+ */
|
|
444
|
615
|
public void clearStableColumnCache() {
|
|
445
|
616
|
stableColumnCache.clear();
|
|
446
|
617
|
log.info("清除了 TDengine 超级表结构缓存");
|
|
|
@@ -448,7 +619,6 @@ public class TDengineService {
|
|
448
|
619
|
|
|
449
|
620
|
public void close() {
|
|
450
|
621
|
log.info("关闭 TDengine 服务...");
|
|
451
|
|
- // batchExecutor 由 Spring 管理生命周期,不在这里 shutdown
|
|
452
|
622
|
if (dataSource != null) {
|
|
453
|
623
|
dataSource.close();
|
|
454
|
624
|
}
|