Просмотр исходного кода

refactor(mqtt): 优化 MqttChargeStationConsumer 并补充单元测试

- 移除冗余的 deepCopyMap 调用(JSON 解析结果已是新对象)
- 空数据列表提前返回,避免无效的 topic 解析
- 简化空 map 过滤逻辑,消除误导性变量名 'list'
- 新增 4 个单元测试覆盖正常写入、空数据、无效 topic、过滤 null/空 map 场景
mqy20260511
humanleft 2 недель назад
Родитель
Сommit
2acb0f92d8

+ 13
- 10
iot-platform/src/main/java/com/iot/platform/mqtt/MqttChargeStationConsumer.java Просмотреть файл

46
     }
46
     }
47
 
47
 
48
     private void processMessageAndWriteToTdEngine(List<Map<String, Object>> dataList, String topic) throws Exception {
48
     private void processMessageAndWriteToTdEngine(List<Map<String, Object>> dataList, String topic) throws Exception {
49
+        if (dataList == null || dataList.isEmpty()) {
50
+            return;
51
+        }
49
 
52
 
50
         String[] topicParts = topic.split("/");
53
         String[] topicParts = topic.split("/");
51
         if (topicParts.length < 4) {
54
         if (topicParts.length < 4) {
54
 
57
 
55
         List<Map<String, Object>> batchToInsert = new ArrayList<>();
58
         List<Map<String, Object>> batchToInsert = new ArrayList<>();
56
         for (Map<String, Object> dataMap : dataList) {
59
         for (Map<String, Object> dataMap : dataList) {
57
-            if (dataMap == null || dataMap.isEmpty()) {
58
-                continue;
60
+            if (dataMap != null && !dataMap.isEmpty()) {
61
+                batchToInsert.add(dataMap);
59
             }
62
             }
60
-            Map<String, Object> list = deepCopyMap(dataMap);
61
-            batchToInsert.add(list);
62
         }
63
         }
63
 
64
 
64
-        if (!batchToInsert.isEmpty()) {
65
-            String dbName = topicParts[1];
66
-            String superTable = topicParts[3];
67
-            LocalDate date = LocalDate.now();
68
-            String tableName = superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
69
-            tdengineService.insertBatch(dbName, tableName, batchToInsert);
65
+        if (batchToInsert.isEmpty()) {
66
+            return;
70
         }
67
         }
68
+
69
+        String dbName = topicParts[1];
70
+        String superTable = topicParts[3];
71
+        LocalDate date = LocalDate.now();
72
+        String tableName = superTable + "_" + date.getYear() + String.format("%02d", date.getMonthValue());
73
+        tdengineService.insertBatch(dbName, tableName, batchToInsert);
71
     }
74
     }
72
 }
75
 }

+ 74
- 2
iot-platform/src/test/java/com/iot/platform/mqtt/MqttChargeStationConsumerTest.java Просмотреть файл

9
 import org.mockito.Mock;
9
 import org.mockito.Mock;
10
 import org.mockito.junit.jupiter.MockitoExtension;
10
 import org.mockito.junit.jupiter.MockitoExtension;
11
 
11
 
12
-import java.util.List;
12
+import java.lang.reflect.Method;
13
+import java.time.LocalDate;
14
+import java.util.*;
13
 import java.util.concurrent.ExecutorService;
15
 import java.util.concurrent.ExecutorService;
14
 import java.util.concurrent.Executors;
16
 import java.util.concurrent.Executors;
15
 import java.util.concurrent.ScheduledExecutorService;
17
 import java.util.concurrent.ScheduledExecutorService;
16
 
18
 
17
-import static org.assertj.core.api.Assertions.assertThat;
19
+import static org.assertj.core.api.Assertions.*;
18
 import static org.mockito.Mockito.*;
20
 import static org.mockito.Mockito.*;
19
 
21
 
20
 @ExtendWith(MockitoExtension.class)
22
 @ExtendWith(MockitoExtension.class)
81
 
83
 
82
         assertThat(result).containsExactly("station/test");
84
         assertThat(result).containsExactly("station/test");
83
     }
85
     }
86
+
87
+    @Test
88
+    @DisplayName("processMessageAndWriteToTdEngine: 正常数据写入 TDengine")
89
+    void processMessage_normalData_callsInsertBatch() throws Exception {
90
+        Method method = MqttChargeStationConsumer.class.getDeclaredMethod(
91
+                "processMessageAndWriteToTdEngine", List.class, String.class);
92
+        method.setAccessible(true);
93
+
94
+        Map<String, Object> data1 = new HashMap<>();
95
+        data1.put("voltage", 220);
96
+        List<Map<String, Object>> dataList = Collections.singletonList(data1);
97
+        String topic = "prefix/myDb/suffix/mySuperTable/extra";
98
+
99
+        LocalDate now = LocalDate.now();
100
+        String expectedTable = "mySuperTable_" + now.getYear() + String.format("%02d", now.getMonthValue());
101
+
102
+        method.invoke(consumer, dataList, topic);
103
+
104
+        verify(tdengineService).insertBatch(eq("myDb"), eq(expectedTable), anyList());
105
+    }
106
+
107
+    @Test
108
+    @DisplayName("processMessageAndWriteToTdEngine: 空数据列表直接返回")
109
+    void processMessage_emptyDataList_skipsInsertBatch() throws Exception {
110
+        Method method = MqttChargeStationConsumer.class.getDeclaredMethod(
111
+                "processMessageAndWriteToTdEngine", List.class, String.class);
112
+        method.setAccessible(true);
113
+
114
+        method.invoke(consumer, Collections.emptyList(), "prefix/db/suffix/table/extra");
115
+
116
+        verify(tdengineService, never()).insertBatch(anyString(), anyString(), anyList());
117
+    }
118
+
119
+    @Test
120
+    @DisplayName("processMessageAndWriteToTdEngine: 无效 topic 格式抛出异常")
121
+    void processMessage_invalidTopic_throwsException() throws Exception {
122
+        Method method = MqttChargeStationConsumer.class.getDeclaredMethod(
123
+                "processMessageAndWriteToTdEngine", List.class, String.class);
124
+        method.setAccessible(true);
125
+
126
+        Map<String, Object> data = new HashMap<>();
127
+        data.put("voltage", 220);
128
+
129
+        assertThatThrownBy(() -> method.invoke(consumer, Collections.singletonList(data), "short/topic"))
130
+                .isInstanceOf(java.lang.reflect.InvocationTargetException.class)
131
+                .hasCauseInstanceOf(IllegalArgumentException.class);
132
+    }
133
+
134
+    @Test
135
+    @DisplayName("processMessageAndWriteToTdEngine: null 和空 map 被过滤")
136
+    void processMessage_nullAndEmptyMaps_filtered() throws Exception {
137
+        Method method = MqttChargeStationConsumer.class.getDeclaredMethod(
138
+                "processMessageAndWriteToTdEngine", List.class, String.class);
139
+        method.setAccessible(true);
140
+
141
+        List<Map<String, Object>> dataList = new ArrayList<>();
142
+        dataList.add(null);
143
+        dataList.add(new HashMap<>());
144
+        Map<String, Object> valid = new HashMap<>();
145
+        valid.put("current", 10);
146
+        dataList.add(valid);
147
+
148
+        String topic = "prefix/myDb/suffix/mySuperTable/extra";
149
+        LocalDate now = LocalDate.now();
150
+        String expectedTable = "mySuperTable_" + now.getYear() + String.format("%02d", now.getMonthValue());
151
+
152
+        method.invoke(consumer, dataList, topic);
153
+
154
+        verify(tdengineService).insertBatch(eq("myDb"), eq(expectedTable), argThat(list -> list.size() == 1));
155
+    }
84
 }
156
 }

Загрузка…
Отмена
Сохранить