Ver código fonte

超级表传输

mqy20260511
humanleft 2 semanas atrás
pai
commit
216f58c8b6

+ 1
- 1
iot-platform/pom.xml Ver arquivo

@@ -198,7 +198,7 @@
198 198
                         </configuration>
199 199
                     </execution>
200 200
                 </executions>
201
-            </plugin>
201
+            </plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin>
202 202
         </plugins>
203 203
         <finalName>${project.artifactId}</finalName>
204 204
     </build>

+ 1
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Ver arquivo

@@ -73,6 +73,6 @@ public class MqttChargeStationConsumer extends AbstractDynamicMqttConsumer {
73 73
         String superTable = topicParts[3];
74 74
         LocalDate date = LocalDate.now();
75 75
         String tableName = superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
76
-        tdengineService.insertBatch(dbName, tableName, batchToInsert);
76
+        tdengineService.insertBatch(dbName, superTable,tableName, batchToInsert);
77 77
     }
78 78
 }

+ 1
- 1
iot-platform/src/main/java/com/iot/platform/mqtt/MqttDynamicConsumer.java Ver arquivo

@@ -110,7 +110,7 @@ public class MqttDynamicConsumer extends AbstractDynamicMqttConsumer {
110 110
         String tableName = ctx.superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
111 111
 
112 112
         List<Map<String, Object>> batch = Collections.singletonList(data);
113
-        tdengineService.insertBatch(ctx.dbName, tableName, batch);
113
+        tdengineService.insertBatch(ctx.dbName, ctx.superTable,tableName, batch);
114 114
     }
115 115
 
116 116
     private void writeToRedis(MessageContext ctx) {

+ 1
- 0
iot-platform/src/main/java/com/iot/platform/service/TdEngineAlarm.java Ver arquivo

@@ -43,6 +43,7 @@ public class TdEngineAlarm {
43 43
      * 记录告警数据到 TdEngine 时序数据库
44 44
      */
45 45
     public void recordFailure(Map<String, Object> data, String superTableName, String table) throws SQLException {
46
+
46 47
         validateNames(superTableName, table);
47 48
 
48 49
         List<String> validKeys = data.keySet().stream()

+ 3
- 3
iot-platform/src/main/java/com/iot/platform/service/TdEngineService.java Ver arquivo

@@ -225,7 +225,7 @@ public class TdEngineService {
225 225
     // ==========================================
226 226
     // 批量插入(按列存储)
227 227
     // ==========================================
228
-    public void insertBatch(String dbName, String table, List<Map<String, Object>> dataList)
228
+    public void insertBatch(String dbName, String superTable,String table, List<Map<String, Object>> dataList)
229 229
             throws SQLException {
230 230
 
231 231
         if (dataList == null || dataList.isEmpty()) {
@@ -233,12 +233,12 @@ public class TdEngineService {
233 233
             return;
234 234
         }
235 235
 
236
-        String superTableName = extractSuperTableName(table);
236
+//        String superTableName = extractSuperTableName(table);
237 237
 
238 238
         int batchSize = DEFAULT_BATCH_SIZE;
239 239
         for (int i = 0; i < dataList.size(); i += batchSize) {
240 240
             List<Map<String, Object>> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size()));
241
-            insertBatchInternal(dbName, superTableName, table, batch);
241
+            insertBatchInternal(dbName, superTable, table, batch);
242 242
         }
243 243
 
244 244
         log.info("批量写入成功: {} | 条数: {}", table, dataList.size());

+ 3
- 3
iot-platform/src/test/java/com/iot/platform/mqtt/MqttChargeStationConsumerTest.java Ver arquivo

@@ -101,7 +101,7 @@ class MqttChargeStationConsumerTest {
101 101
 
102 102
         method.invoke(consumer, dataList, topic);
103 103
 
104
-        verify(tdengineService).insertBatch(eq("myDb"), eq(expectedTable), anyList());
104
+        verify(tdengineService).insertBatch(eq("myDb"), "mySuperTable_",eq(expectedTable), anyList());
105 105
     }
106 106
 
107 107
     @Test
@@ -113,7 +113,7 @@ class MqttChargeStationConsumerTest {
113 113
 
114 114
         method.invoke(consumer, Collections.emptyList(), "prefix/db/suffix/table/extra");
115 115
 
116
-        verify(tdengineService, never()).insertBatch(anyString(), anyString(), anyList());
116
+        verify(tdengineService, never()).insertBatch(anyString(), "mySuperTable_",anyString(), anyList());
117 117
     }
118 118
 
119 119
     @Test
@@ -151,6 +151,6 @@ class MqttChargeStationConsumerTest {
151 151
 
152 152
         method.invoke(consumer, dataList, topic);
153 153
 
154
-        verify(tdengineService).insertBatch(eq("myDb"), eq(expectedTable), argThat(list -> list.size() == 1));
154
+        verify(tdengineService).insertBatch(eq("myDb"), "mySuperTable_",eq(expectedTable), argThat(list -> list.size() == 1));
155 155
     }
156 156
 }

+ 2
- 2
iot-platform/src/test/java/com/iot/platform/service/TDengineServiceTest.java Ver arquivo

@@ -305,8 +305,8 @@ class TDengineServiceTest {
305 305
     @Test
306 306
     @DisplayName("insertBatch: 空列表应直接返回不抛异常")
307 307
     void insertBatch_emptyList_returnsWithoutError() throws Exception {
308
-        tdengineService.insertBatch("db", "table", Collections.emptyList());
309
-        tdengineService.insertBatch("db", "table", null);
308
+        tdengineService.insertBatch("db", "supertable","table", Collections.emptyList());
309
+        tdengineService.insertBatch("db", "supertable","table", null);
310 310
     }
311 311
 
312 312
     @Test

Carregando…
Cancelar
Salvar