Переглянути джерело

refactor: P0-P2 security, quality, deployment overhaul

Security (P0):
- Remove hardcoded MySQL password from SysrealtimeService
- Add table name whitelist validation (regex + length limit)
- Validate ${tableName} in SysrealtimeMapper.xml
- Externalize all credentials to .env file
- Fix unbounded thread pools in MQTT consumers
- Fix mysqlWritePool leak in MqttFaultConsumer

Quality (P1):
- Extract AbstractMqttConsumer base class (-500+ duplicate lines)
- Refactor VehicleSyncTask: SCAN instead of KEYS, batch ops,
  RestTemplate timeout, loop-invariant extraction
- Add unit tests (RedisKeys, SysrealtimeService, MqttConsumer)

Architecture (P2):
- Add Spring Boot Actuator (/actuator/health)
- Create RedisKeys constants class for key namespace
- Add .env.example and start.sh for local development

Deployment:
- Add systemd service (iot-platform.service)
- Add deploy.sh with build, upload, backup, health check, rollback
- Add setup-server.sh for server initialization
- Add health-check.sh (Actuator + TCP port fallback)

Docs:
- Update CLAUDE.md for standalone iot-platform architecture
- Update README.md with build/deploy instructions
- Add deploy/README.md
mqy20260511
humanleft 4 тижднів тому
джерело
коміт
c3c35ae204
25 змінених файлів з 1379 додано та 907 видалено
  1. 4
    0
      .gitignore
  2. 115
    92
      CLAUDE.md
  3. 62
    0
      README.md
  4. 158
    0
      deploy/README.md
  5. 169
    0
      deploy/deploy.sh
  6. 48
    0
      deploy/health-check.sh
  7. 32
    0
      deploy/iot-platform.service
  8. 130
    0
      deploy/setup-server.sh
  9. 22
    0
      iot-platform/.env.example
  10. 6
    0
      iot-platform/pom.xml
  11. 47
    0
      iot-platform/src/main/java/com/iot/platform/common/RedisKeys.java
  12. 1
    0
      iot-platform/src/main/java/com/iot/platform/mapper/SysrealtimeMapper.java
  13. 177
    0
      iot-platform/src/main/java/com/iot/platform/mqtt/AbstractMqttConsumer.java
  14. 55
    277
      iot-platform/src/main/java/com/iot/platform/mqtt/MqttFaultConsumer.java
  15. 27
    227
      iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java
  16. 25
    229
      iot-platform/src/main/java/com/iot/platform/mqtt/MqttStatusConsumer.java
  17. 32
    39
      iot-platform/src/main/java/com/iot/platform/service/SysrealtimeService.java
  18. 63
    31
      iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java
  19. 7
    7
      iot-platform/src/main/resources/application-druid.yml
  20. 15
    5
      iot-platform/src/main/resources/application.yml
  21. 4
    0
      iot-platform/src/main/resources/mapper/SysrealtimeMapper.xml
  22. 57
    0
      iot-platform/src/test/java/com/iot/platform/common/RedisKeysTest.java
  23. 24
    0
      iot-platform/src/test/java/com/iot/platform/mqtt/MqttGenericConsumerTest.java
  24. 83
    0
      iot-platform/src/test/java/com/iot/platform/service/SysrealtimeServiceTest.java
  25. 16
    0
      start.sh

+ 4
- 0
.gitignore Переглянути файл

@@ -48,3 +48,7 @@ nbdist/
48 48
 
49 49
 # Claude
50 50
 .claude/
51
+
52
+# Environment variables
53
+.env
54
+!.env.example

+ 115
- 92
CLAUDE.md Переглянути файл

@@ -4,162 +4,185 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
4 4
 
5 5
 ## Project Overview
6 6
 
7
-This is **RuoYi v3.9.0** (若依), a Chinese Spring Boot + Vue admin framework customized for IoT/Energy Storage Management (储能运营平台). It extends the standard RuoYi RBAC system with IoT device management, time-series data ingestion, MQTT messaging, and vehicle/equipment tracking.
7
+This is an **IoT Energy Storage Management Platform** (储能运营平台), originally based on RuoYi v3.9.0 but now stripped down to a standalone Spring Boot module focused on IoT device telemetry ingestion, MQTT messaging, time-series data storage, and vehicle/equipment tracking.
8 8
 
9 9
 - **Backend**: Spring Boot 2.5.15, Java 8, Spring Security 5.7.12, JWT authentication
10 10
 - **Frontend**: Vue 2 + Element UI (separate repository, not in this workspace)
11
-- **Documentation**: http://doc.ruoyi.vip
11
+- **Database**: MySQL 8 (dual datasource via Druid), TDengine 3.x for time-series
12
+- **Cache**: Redis (localhost:6379)
13
+- **Message Broker**: MQTT (EMQX on 47.104.204.180:1883)
12 14
 
13 15
 ## Build & Run Commands
14 16
 
15
-This is a Maven multi-module project. All commands should be run from the repository root.
16
-
17 17
 ```bash
18
-# Compile all modules
19
-mvn clean compile
20
-
21
-# Package into executable JAR (skips tests — there are no tests in this project)
22
-mvn clean package -Dmaven.test.skip=true
18
+# Build the iot-platform module
19
+mvn clean package -pl iot-platform -am -Dmaven.test.skip=true
23 20
 
24
-# Run the application (after packaging)
25
-cd ruoyi-admin/target
26
-java -jar ruoyi-admin.jar
21
+# Run locally with .env
22
+source .env && cd iot-platform/target && java -jar iot-platform.jar
27 23
 
28
-# Or use the provided script
29
-./ry.sh start    # start / stop / restart / status
24
+# Or use the convenience script
25
+./start.sh start
30 26
 ```
31 27
 
32
-The application starts on **port 8887**. The executable JAR is `ruoyi-admin/target/ruoyi-admin.jar`.
28
+The application starts on **port 8887**. The executable JAR is `iot-platform/target/iot-platform.jar`.
33 29
 
34 30
 ## Module Architecture
35 31
 
32
+This is now a **single-module Spring Boot application** (previously a multi-module RuoYi project).
33
+
36 34
 ```
37
-ruoyi-admin       → Web layer: REST controllers, Swagger config, application.yml entrypoint
38
-ruoyi-framework   → Infrastructure: Spring Security config, JWT filter, Druid datasource,
39
-                    Redis config, MyBatis config, AOP aspects (logging, rate limit, data scope)
40
-ruoyi-system      → Business layer: domain models, mappers (MyBatis), services,
41
-                    service implementations — this is where custom IoT logic lives
42
-ruoyi-common      → Shared utilities: constants, enums, exception types, utils (StringUtils,
43
-                    SecurityUtils, DateUtils, ServletUtils, etc.)
44
-ruoyi-quartz      → Scheduled job engine (standard RuoYi)
45
-ruoyi-generator   → Code generation templates (standard RuoYi)
35
+iot-platform/
36
+├── controller     → REST API endpoints, Swagger config
37
+├── service        → Business logic: IoT data sync, MQTT consumers, TDengine ops
38
+├── domain         → Entity classes, DTOs, VO objects
39
+├── mapper         → MyBatis mapper interfaces + XML in resources/mapper/
40
+├── mqtt           → MQTT consumers (AbstractMqttConsumer base class)
41
+├── task           → Scheduled tasks (VehicleSyncTask, etc.)
42
+├── config         → Spring configuration classes
43
+├── common         → Shared constants (RedisKeys), utilities
44
+└── datasource     → Dynamic datasource configuration
46 45
 ```
47 46
 
48
-Dependency flow: `admin → framework → system → common`. `quartz` and `generator` are optional add-ons.
49
-
50 47
 ## Data Architecture
51 48
 
52 49
 ### Dual MySQL Databases (Druid connection pool)
53
-- **Master** (`data`): RuoYi system tables (users, roles, menus, logs, custom business tables)
50
+- **Master** (`data`): System tables (users, roles, menus) + business tables
54 51
 - **Slave** (`cnc`): Secondary data — configured in `application-druid.yml`
55 52
 - Both hosted on `47.104.204.180:3306`
56 53
 - Dynamic datasource switching via `@DataSource` annotation
57 54
 
58 55
 ### TDengine (Time-Series Database)
59 56
 - Used for high-frequency IoT telemetry ingestion
60
-- Connection pool: HikariCP via `TDengineService` (`ruoyi-system/.../service/TDengineService.java`)
57
+- Connection pool: HikariCP via `TDengineService`
61 58
 - Connects to `jdbc:TAOS://localhost:6030/`
62
-- Super-table pattern: `CREATE STABLE ... (ts TIMESTAMP, surfacename VARCHAR(64)) TAGS (location BINARY(64))`
63
-- Column caching in memory (`stableColumnCache`) to avoid repeated `DESCRIBE` calls
59
+- Super-table pattern with column caching (`stableColumnCache`)
64 60
 
65 61
 ### Redis
66 62
 - Host: `localhost:6379`
67
-- Key patterns used by custom code:
68
-  - `DSB:active:devices` — Set of active IoT device Redis keys
69
-  - `DSB:<controllerId>:<metricName>` — Hash storing device telemetry fields
70
-  - `workorder:coordinate:<controllerId>` — Hash storing vehicle GPS (latitude, longitude)
71
-  - `<controllerId>:<topicName>` / `<controllerId>_cmd:<topicName>` / `<controllerId>_fault:<topicName>` — MQTT topic metadata
72
-
73
-## Custom Business Domains (Beyond Standard RuoYi)
74
-
75
-Standard RuoYi provides users, roles, menus, depts, posts, dicts, config, notices, operlog, logininfor. This project adds:
76
-
77
-| Domain | Purpose | Key Files |
78
-|--------|---------|-----------|
79
-| **SysDevice** | IoT device telemetry key-value storage | `domain/SysDevice.java`, `mapper/SysDeviceMapper.java`, `service/SysDeviceService.java` |
80
-| **SysDeviceControl** | Device configuration / metric whitelist | `domain/SysDeviceControl.java`, `service/SysDeviceControlService.java` |
81
-| **SysDeviceVo** | Aggregated device view for frontend | `domain/vo/SysDeviceVo.java`, `service/SysDeviceVoService.java` |
82
-| **SysCar** | Vehicle/equipment with GPS tracking | `domain/SysCar.java`, `service/SysCarService.java` |
83
-| **SysAlarm** | Alarm events from devices | `domain/SysAlarm.java`, `service/SysAlarmService.java` |
84
-| **SysFault** | Fault records | `domain/SysFault.java`, `service/SysFaultService.java` |
85
-| **SysWorkorder** | Work orders / service tickets | `domain/SysWorkorder.java`, `service/SysWorkorderService.java` |
86
-| **SysCompany** | Partner company management | `domain/SysCompany.java`, `service/SysCompanyService.java` |
87
-| **SysIndicators** | Daily business KPIs (order count, profit) | `domain/SysIndicators.java`, `service/SysIndicatorsService.java` |
88
-| **SysController** | MQTT topic registry per controller | `domain/SysController.java`, `service/SysControllerService.java` |
89
-| **Sysrealtime** | Real-time device data sync from Redis → MySQL | `domain/Sysrealtime.java`, `service/SysrealtimeService.java` |
90
-| **ControllerData / topics** | DTOs for MQTT JSON payload parsing | `domain/ControllerData.java`, `domain/topics.java` |
91
-| **MqttClientWrapper** | Wrapper around Eclipse Paho MQTT client | `domain/MqttClientWrapper.java` |
63
+- Key patterns (defined in `common/RedisKeys.java`):
64
+  - `DSB:active:devices` — Set of active IoT device keys
65
+  - `DSB:<controllerId>:<metricName>` — Hash storing device telemetry
66
+  - `workorder:coordinate:<controllerId>` — GPS coordinates
67
+  - `<controllerId>:<topicName>` — MQTT topic metadata
68
+
69
+## Key Configuration Files
70
+
71
+- `iot-platform/src/main/resources/application.yml` — Main config (port 8887, Redis, MyBatis, Actuator)
72
+- `iot-platform/src/main/resources/application-druid.yml` — Database connection (Druid pool, master/slave)
73
+- `iot-platform/src/main/resources/logback-spring.xml` — Logging config with rolling
74
+- `.env` — Environment variables (passwords, credentials) loaded by systemd or start.sh
75
+
76
+## Environment Variables
77
+
78
+All sensitive credentials are externalized to `.env`:
79
+
80
+```bash
81
+MYSQL_USERNAME=root
82
+MYSQL_PASSWORD="..."
83
+REDIS_PASSWORD=
84
+MQTT_USERNAME=...
85
+MQTT_PASSWORD=...
86
+TDENGINE_USERNAME=root
87
+TDENGINE_PASSWORD=taosdata
88
+DRUID_STAT_ENABLED=false
89
+DRUID_USERNAME=ruoyi
90
+DRUID_PASSWORD=...
91
+```
92
+
93
+Spring Boot config uses `${ENV_NAME:default}` syntax for all sensitive values.
92 94
 
93 95
 ## IoT Data Flow
94 96
 
95 97
 ```
96 98
 MQTT Broker (47.104.204.180:1883)
97
-    ↓  subscribes to "+/generics"
98
-TdengineController2 / TdegnineController3 (MQTT listeners)
99
-    ↓  parse JSON → ControllerData → topics
99
+    ↓  subscribes to "+/generics", "+/status", "+/fault"
100
+MqttGenericConsumer / MqttStatusConsumer / MqttFaultConsumer
101
+    ↓  parse JSON → ControllerData
100 102
     ├─→ Redis (DSB:active:devices, DSB:<id>:<metric> hashes)
101
-    ├─→ MySQL (sys_controller table — topic registry)
102
-    └─→ TDengine (time-series telemetry)
103
+    ├─→ MySQL (sys_controller, sys_device tables)
104
+    └─→ TDengine (time-series telemetry tables)
103 105
 
104
-SysCarController (scheduled tasks, every 30s)
106
+VehicleSyncTask (@Scheduled every 30s)
105 107
     ↓ reads Redis DSB:* keys
106 108
     ├─→ syncs to sysrealtime (MySQL)
107 109
     ├─→ updates vehicle GPS in sys_car (MySQL)
108 110
     ├─→ updates sys_indicators KPIs (MySQL)
109
-    └─→ triggers external webhook (https://esos-iot.com:9443/syscar/trigger)
111
+    └─→ triggers external webhook
110 112
 ```
111 113
 
112
-There are **4 `@Scheduled(fixedRate = 30000)` tasks** in `SysCarController`:
113
-1. `updatesyscar()` — sync vehicle GPS from Redis to MySQL, trigger webhook on change
114
-2. `insertdevice()` — sync device config from Redis to `sys_device_vo`
115
-3. `syncRedisToMySQL()` — sync telemetry from Redis to `sysrealtime`
116
-4. `insertindicators()` — aggregate daily KPIs per company into `sys_indicators`
114
+## MQTT Consumers
117 115
 
118
-## Key Configuration Files
116
+All MQTT consumers extend `AbstractMqttConsumer` which provides:
117
+- Connection management (reconnect, keepalive)
118
+- `checkServerAvailability()` with retry limits
119
+- Graceful shutdown via `@PreDestroy`
120
+
121
+Implementations:
122
+- `MqttGenericConsumer` — General telemetry ingestion
123
+- `MqttStatusConsumer` — Device status messages
124
+- `MqttFaultConsumer` — Fault/alarm messages
125
+
126
+## Deployment
127
+
128
+Production deployment uses systemd on online180 (47.104.204.180).
129
+
130
+```bash
131
+# Deploy from local
132
+cd deploy
133
+./deploy.sh --build          # Build and deploy
134
+./deploy.sh --jar <path>     # Deploy specific jar
135
+
136
+# Server management
137
+systemctl status iot-platform
138
+systemctl restart iot-platform
139
+journalctl -u iot-platform -f
140
+```
119 141
 
120
-- `ruoyi-admin/src/main/resources/application.yml` — Main config (port 8887, Redis, MyBatis, PageHelper, Swagger, token settings)
121
-- `ruoyi-admin/src/main/resources/application-druid.yml` — Database connection (master/slave MySQL, Druid pool)
122
-- `ruoyi-admin/src/main/resources/mybatis/mybatis-config.xml` — MyBatis global settings
123
-- `ruoyi-admin/src/main/resources/logback.xml` — Logging config
142
+See `deploy/README.md` for full deployment documentation.
124 143
 
125 144
 ## Authentication & Security
126 145
 
127
-- JWT-based stateless auth (token header: `Authorization`)
128
-- Spring Security config in `ruoyi-framework/.../config/SecurityConfig.java`
129
-- JWT filter: `JwtAuthenticationTokenFilter.java`
146
+- JWT-based stateless auth (header: `Authorization`)
147
+- Spring Security config in `config/SecurityConfig.java`
130 148
 - Password max retry: 5 attempts, lock time: 10 minutes
131
-- Swagger enabled at `/dev-api` path mapping
132
-- XSS filtering enabled for `/system/*`, `/monitor/*`, `/tool/*`
149
+- XSS filtering enabled
150
+- SQL injection prevention: field whitelist + parameterized queries in TDengineService
151
+- Table name validation: regex `^[a-zA-Z_][a-zA-Z0-9_]*$` in SysrealtimeService
133 152
 
134 153
 ## API Response Format
135 154
 
136
-All controllers return `AjaxResult` (from `ruoyi-common`), which wraps:
155
+All controllers return `AjaxResult`:
137 156
 ```java
138 157
 {
139
-  "code": 200,          // HttpStatus constant
140
-  "msg": "操作成功",     // message
141
-  "data": { ... }       // payload
158
+  "code": 200,
159
+  "msg": "操作成功",
160
+  "data": { ... }
142 161
 }
143 162
 ```
144 163
 
145
-Page queries use `TableDataInfo` which wraps:
164
+Page queries use `TableDataInfo`:
146 165
 ```java
147 166
 {
148 167
   "code": 200,
149 168
   "msg": "查询成功",
150
-  "rows": [ ... ],      // list data
151
-  "total": 100          // total count
169
+  "rows": [ ... ],
170
+  "total": 100
152 171
 }
153 172
 ```
154 173
 
174
+## Health Check
175
+
176
+Spring Boot Actuator is enabled at `/actuator/health`:
177
+```bash
178
+curl http://localhost:8887/actuator/health
179
+# {"status":"UP"}
180
+```
181
+
155 182
 ## Important Implementation Notes
156 183
 
157
-- **No unit tests exist** in this project. Test coverage is 0%.
158
-- **Field injection is used everywhere** (`@Autowired` on fields), not constructor injection.
159 184
 - **MyBatis mappers are Java interfaces** with XML mapping files in `src/main/resources/mapper/`
160
-- **Dynamic SQL provider**: `ruoyi-system/.../mapper/DynamicSqlProvider.java` provides programmatic SQL for some queries
161
-- **Service naming convention**: RuoYi standard services use `ISysXxxService` interface + `SysXxxServiceImpl` implementation. Custom services (added by this project) often omit the `I` prefix and use concrete classes directly.
162
-- **TDengine SQL is built with string concatenation** in `TDengineService.java` — be careful with SQL injection if user input reaches those paths.
163
-- **MQTT credentials are hardcoded** in `TdengineController2.java` and `TdegnineController3.java`.
164
-- **Database passwords are in plaintext** in `application-druid.yml`.
165
-- **The project uses Alibaba Maven mirror** (`https://maven.aliyun.com/repository/public`) configured in root `pom.xml`.
185
+- **Service naming**: Custom IoT services often use concrete classes directly (no `I` prefix)
186
+- **TDengine SQL**: Built with string concatenation but protected by field whitelist (`ALLOWED_COLUMNS`) and `escapeValue()`
187
+- **No distributed locks**: `VehicleSyncTask` scheduled tasks may duplicate in clustered environments
188
+- **Alibaba Maven mirror** (`https://maven.aliyun.com/repository/public`) configured in root `pom.xml`

+ 62
- 0
README.md Переглянути файл

@@ -3,6 +3,10 @@
3 3
 </p>
4 4
 <h1 align="center" style="margin: 30px 0 30px; font-weight: bold;">RuoYi v3.9.0</h1>
5 5
 <h4 align="center">基于SpringBoot+Vue前后端分离的Java快速开发框架</h4>
6
+
7
+> **本项目说明**:此仓库为 RuoYi v3.9.0 剥离后的 <b>IoT 储能运营平台</b> 独立模块。移除了 RuoYi 原生的 admin/framework/system/common/quartz/generator 等模块,仅保留 IoT 业务核心代码(设备管理、MQTT 消息接入、时序数据存储、车辆定位同步等)。
8
+>
9
+> 生产服务器:`online180` (47.104.204.180),使用 systemd 管理服务,部署脚本位于 `deploy/` 目录。
6 10
 <p align="center">
7 11
 	<a href="https://gitee.com/y_project/RuoYi-Vue/stargazers"><img src="https://gitee.com/y_project/RuoYi-Vue/badge/star.svg?theme=dark"></a>
8 12
 	<a href="https://gitee.com/y_project/RuoYi-Vue"><img src="https://img.shields.io/badge/RuoYi-v3.9.0-brightgreen.svg"></a>
@@ -90,6 +94,64 @@
90 94
 </table>
91 95
 
92 96
 
97
+## IoT 平台构建与部署
98
+
99
+### 本地构建
100
+
101
+```bash
102
+# 构建 iot-platform 模块
103
+mvn clean package -pl iot-platform -am -Dmaven.test.skip=true
104
+
105
+# 本地运行(需先配置 .env)
106
+source .env && ./start.sh start
107
+```
108
+
109
+### 环境变量
110
+
111
+项目根目录 `.env` 文件包含所有敏感凭据(已加入 `.gitignore`):
112
+
113
+```bash
114
+MYSQL_USERNAME=root
115
+MYSQL_PASSWORD="..."
116
+MQTT_USERNAME=...
117
+MQTT_PASSWORD=...
118
+TDENGINE_USERNAME=root
119
+TDENGINE_PASSWORD=taosdata
120
+```
121
+
122
+Spring Boot 配置使用 `${ENV_NAME:default}` 语法读取这些变量。
123
+
124
+### 生产部署
125
+
126
+```bash
127
+cd deploy
128
+
129
+# 一键构建并部署到 online180
130
+./deploy.sh --build
131
+
132
+# 或指定 jar 部署
133
+./deploy.sh --jar ../iot-platform/target/iot-platform.jar
134
+```
135
+
136
+部署流程:构建 → 上传 → 备份 → 停止 → 替换 → 启动 → 健康检查 → 失败自动回滚。
137
+
138
+详见 `deploy/README.md`。
139
+
140
+### 服务器管理
141
+
142
+```bash
143
+# 查看状态
144
+systemctl status iot-platform
145
+
146
+# 查看实时日志
147
+journalctl -u iot-platform -f
148
+
149
+# 重启服务
150
+systemctl restart iot-platform
151
+```
152
+
153
+---
154
+
93 155
 ## 若依前后端分离交流群
94 156
 
95 157
 QQ群: [![加入QQ群](https://img.shields.io/badge/已满-937441-blue.svg)](https://jq.qq.com/?_wv=1027&k=5bVB1og) [![加入QQ群](https://img.shields.io/badge/已满-887144332-blue.svg)](https://jq.qq.com/?_wv=1027&k=5eiA4DH) [![加入QQ群](https://img.shields.io/badge/已满-180251782-blue.svg)](https://jq.qq.com/?_wv=1027&k=5AxMKlC) [![加入QQ群](https://img.shields.io/badge/已满-104180207-blue.svg)](https://jq.qq.com/?_wv=1027&k=51G72yr) [![加入QQ群](https://img.shields.io/badge/已满-186866453-blue.svg)](https://jq.qq.com/?_wv=1027&k=VvjN2nvu) [![加入QQ群](https://img.shields.io/badge/已满-201396349-blue.svg)](https://jq.qq.com/?_wv=1027&k=5vYAqA05) [![加入QQ群](https://img.shields.io/badge/已满-101456076-blue.svg)](https://jq.qq.com/?_wv=1027&k=kOIINEb5) [![加入QQ群](https://img.shields.io/badge/已满-101539465-blue.svg)](https://jq.qq.com/?_wv=1027&k=UKtX5jhs) [![加入QQ群](https://img.shields.io/badge/已满-264312783-blue.svg)](https://jq.qq.com/?_wv=1027&k=EI9an8lJ) [![加入QQ群](https://img.shields.io/badge/已满-167385320-blue.svg)](https://jq.qq.com/?_wv=1027&k=SWCtLnMz) [![加入QQ群](https://img.shields.io/badge/已满-104748341-blue.svg)](https://jq.qq.com/?_wv=1027&k=96Dkdq0k) [![加入QQ群](https://img.shields.io/badge/已满-160110482-blue.svg)](https://jq.qq.com/?_wv=1027&k=0fsNiYZt) [![加入QQ群](https://img.shields.io/badge/已满-170801498-blue.svg)](https://jq.qq.com/?_wv=1027&k=7xw4xUG1) [![加入QQ群](https://img.shields.io/badge/已满-108482800-blue.svg)](https://jq.qq.com/?_wv=1027&k=eCx8eyoJ) [![加入QQ群](https://img.shields.io/badge/已满-101046199-blue.svg)](https://jq.qq.com/?_wv=1027&k=SpyH2875) [![加入QQ群](https://img.shields.io/badge/已满-136919097-blue.svg)](https://jq.qq.com/?_wv=1027&k=tKEt51dz) [![加入QQ群](https://img.shields.io/badge/已满-143961921-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=0vBbSb0ztbBgVtn3kJS-Q4HUNYwip89G&authKey=8irq5PhutrZmWIvsUsklBxhj57l%2F1nOZqjzigkXZVoZE451GG4JHPOqW7AW6cf0T&noverify=0&group_code=143961921) [![加入QQ群](https://img.shields.io/badge/已满-174951577-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=ZFAPAbp09S2ltvwrJzp7wGlbopsc0rwi&authKey=HB2cxpxP2yspk%2Bo3WKTBfktRCccVkU26cgi5B16u0KcAYrVu7sBaE7XSEqmMdFQp&noverify=0&group_code=174951577) [![加入QQ群](https://img.shields.io/badge/已满-161281055-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=Fn2aF5IHpwsy8j6VlalNJK6qbwFLFHat&authKey=uyIT%2B97x2AXj3odyXpsSpVaPMC%2Bidw0LxG5MAtEqlrcBcWJUA%2FeS43rsF1Tg7IRJ&noverify=0&group_code=161281055) [![加入QQ群](https://img.shields.io/badge/已满-138988063-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=XIzkm_mV2xTsUtFxo63bmicYoDBA6Ifm&authKey=dDW%2F4qsmw3x9govoZY9w%2FoWAoC4wbHqGal%2BbqLzoS6VBarU8EBptIgPKN%2FviyC8j&noverify=0&group_code=138988063) [![加入QQ群](https://img.shields.io/badge/已满-151450850-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=DkugnCg68PevlycJSKSwjhFqfIgrWWwR&authKey=pR1Pa5lPIeGF%2FFtIk6d%2FGB5qFi0EdvyErtpQXULzo03zbhopBHLWcuqdpwY241R%2F&noverify=0&group_code=151450850) [![加入QQ群](https://img.shields.io/badge/已满-224622315-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=F58bgRa-Dp-rsQJThiJqIYv8t4-lWfXh&authKey=UmUs4CVG5OPA1whvsa4uSespOvyd8%2FAr9olEGaWAfdLmfKQk%2FVBp2YU3u2xXXt76&noverify=0&group_code=224622315) [![加入QQ群](https://img.shields.io/badge/已满-287842588-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=Nxb2EQ5qozWa218Wbs7zgBnjLSNk_tVT&authKey=obBKXj6SBKgrFTJZx0AqQnIYbNOvBB2kmgwWvGhzxR67RoRr84%2Bus5OadzMcdJl5&noverify=0&group_code=287842588) [![加入QQ群](https://img.shields.io/badge/已满-187944233-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=numtK1M_I4eVd2Gvg8qtbuL8JgX42qNh&authKey=giV9XWMaFZTY%2FqPlmWbkB9g3fi0Ev5CwEtT9Tgei0oUlFFCQLDp4ozWRiVIzubIm&noverify=0&group_code=187944233) [![加入QQ群](https://img.shields.io/badge/已满-228578329-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=G6r5KGCaa3pqdbUSXNIgYloyb8e0_L0D&authKey=4w8tF1eGW7%2FedWn%2FHAypQksdrML%2BDHolQSx7094Agm7Luakj9EbfPnSTxSi2T1LQ&noverify=0&group_code=228578329) [![加入QQ群](https://img.shields.io/badge/191164766-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=GsOo-OLz53J8y_9TPoO6XXSGNRTgbFxA&authKey=R7Uy%2Feq%2BZsoKNqHvRKhiXpypW7DAogoWapOawUGHokJSBIBIre2%2FoiAZeZBSLuBc&noverify=0&group_code=191164766) 点击按钮入群。

+ 158
- 0
deploy/README.md Переглянути файл

@@ -0,0 +1,158 @@
1
+# IoT Platform 部署文档
2
+
3
+## 概述
4
+
5
+本项目使用 systemd 管理进程,通过 `deploy.sh` 脚本实现一键构建、上传、备份、健康检查和自动回滚。
6
+
7
+## 服务器环境
8
+
9
+- **服务器**: online180 (47.104.204.180)
10
+- **OS**: Alibaba Cloud Linux 3
11
+- **Java**: OpenJDK 1.8.0_412
12
+- **安装目录**: `/opt/iot-platform/`
13
+- **服务名称**: `iot-platform`
14
+
15
+## 目录结构
16
+
17
+```
18
+/opt/iot-platform/
19
+├── iot-platform.jar          # 当前运行版本
20
+├── .env                      # 环境变量(密码、凭据)
21
+├── backup/                   # 版本备份(自动创建)
22
+├── bin/
23
+│   ├── deploy.sh             # 一键部署脚本(本地运行)
24
+│   └── health-check.sh       # 健康检查脚本
25
+├── config/                   # 线上专属配置
26
+└── logs/                     # 日志输出
27
+```
28
+
29
+## 快速开始
30
+
31
+### 首次部署(服务器初始化)
32
+
33
+如果服务器上还没有安装 systemd 服务,执行初始化:
34
+
35
+```bash
36
+cd deploy
37
+scp setup-server.sh root@47.104.204.180:/tmp/
38
+scp iot-platform.service root@47.104.204.180:/tmp/
39
+scp ../.env root@47.104.204.180:/tmp/iot-platform.env
40
+ssh root@47.104.204.180 "chmod +x /tmp/setup-server.sh && /tmp/setup-server.sh"
41
+```
42
+
43
+### 日常部署
44
+
45
+#### 方式一:本地构建后部署
46
+
47
+```bash
48
+cd deploy
49
+./deploy.sh --build
50
+```
51
+
52
+流程:
53
+1. 本地执行 `mvn clean package`
54
+2. 上传 jar 到服务器
55
+3. 备份当前版本(带时间戳)
56
+4. 停止 systemd 服务
57
+5. 替换 jar
58
+6. 启动服务
59
+7. 健康检查(`/actuator/health`)
60
+8. 失败则自动回滚到上一个版本
61
+
62
+#### 方式二:指定 jar 部署
63
+
64
+```bash
65
+cd deploy
66
+./deploy.sh --jar ../iot-platform/target/iot-platform.jar
67
+```
68
+
69
+#### 方式三:CI/CD 构建后部署
70
+
71
+```bash
72
+cd deploy
73
+./deploy.sh --jar /path/to/ci/artifacts/iot-platform.jar
74
+```
75
+
76
+## 环境变量
77
+
78
+`.env` 文件位于项目根目录和服务器 `/opt/iot-platform/.env`,包含所有敏感凭据:
79
+
80
+```bash
81
+# MySQL
82
+MYSQL_USERNAME=root
83
+MYSQL_PASSWORD="..."
84
+
85
+# Redis
86
+REDIS_PASSWORD=
87
+
88
+# MQTT
89
+MQTT_USERNAME=...
90
+MQTT_PASSWORD=...
91
+
92
+# TDengine
93
+TDENGINE_USERNAME=root
94
+TDENGINE_PASSWORD=taosdata
95
+
96
+# Druid 监控
97
+DRUID_STAT_ENABLED=false
98
+DRUID_USERNAME=ruoyi
99
+DRUID_PASSWORD=...
100
+```
101
+
102
+**注意**:`.env` 文件已加入 `.gitignore`,**切勿提交到 Git**。
103
+
104
+## 服务器管理
105
+
106
+```bash
107
+# 查看状态
108
+systemctl status iot-platform
109
+
110
+# 重启
111
+systemctl restart iot-platform
112
+
113
+# 停止
114
+systemctl stop iot-platform
115
+
116
+# 查看日志
117
+journalctl -u iot-platform -f
118
+
119
+# 查看历史日志
120
+journalctl -u iot-platform --since "1 hour ago"
121
+```
122
+
123
+## 健康检查
124
+
125
+```bash
126
+# 在服务器上执行
127
+bash /opt/iot-platform/bin/health-check.sh localhost 8887 30
128
+
129
+# 或从本地检查
130
+curl -s http://47.104.204.180:8887/actuator/health
131
+```
132
+
133
+健康检查策略:
134
+1. 优先检查 `/actuator/health`(Spring Boot Actuator)
135
+2. 如果 Actuator 未启用,降级为 TCP 端口监听检查
136
+
137
+## 回滚
138
+
139
+如果部署失败,`deploy.sh` 会自动回滚到上一个版本。
140
+
141
+手动回滚:
142
+
143
+```bash
144
+ssh root@47.104.204.180
145
+systemctl stop iot-platform
146
+ls /opt/iot-platform/backup/          # 查看可用备份
147
+cp /opt/iot-platform/backup/iot-platform-XXXX.jar /opt/iot-platform/iot-platform.jar
148
+systemctl start iot-platform
149
+```
150
+
151
+## 配置文件说明
152
+
153
+| 文件 | 说明 |
154
+|------|------|
155
+| `deploy.sh` | 一键部署脚本(本地执行) |
156
+| `setup-server.sh` | 服务器初始化脚本(服务器执行) |
157
+| `health-check.sh` | 健康检查脚本 |
158
+| `iot-platform.service` | systemd 服务定义 |

+ 169
- 0
deploy/deploy.sh Переглянути файл

@@ -0,0 +1,169 @@
1
+#!/bin/bash
2
+# IoT Platform 一键部署脚本
3
+# 用法:
4
+#   ./deploy.sh --build              # 本地构建后部署
5
+#   ./deploy.sh --jar path/to/jar    # 使用指定 jar 部署
6
+#   ./deploy.sh --build --jar path   # 构建并使用指定 jar(覆盖)
7
+#
8
+# 环境变量:
9
+#   SERVER_HOST    默认: 47.104.204.180
10
+#   SERVER_USER    默认: root
11
+#   REMOTE_DIR     默认: /opt/iot-platform
12
+
13
+set -e
14
+
15
+# 配置
16
+SERVER_HOST="${SERVER_HOST:-47.104.204.180}"
17
+SERVER_USER="${SERVER_USER:-root}"
18
+REMOTE_DIR="${REMOTE_DIR:-/opt/iot-platform}"
19
+APP_NAME="iot-platform"
20
+LOCAL_JAR=""
21
+DO_BUILD=false
22
+
23
+# 颜色
24
+RED='\033[0;31m'
25
+GREEN='\033[0;32m'
26
+YELLOW='\033[1;33m'
27
+NC='\033[0m'
28
+
29
+# 解析参数
30
+while [[ $# -gt 0 ]]; do
31
+    case $1 in
32
+        --build)
33
+            DO_BUILD=true
34
+            shift
35
+            ;;
36
+        --jar)
37
+            LOCAL_JAR="$2"
38
+            shift 2
39
+            ;;
40
+        --help|-h)
41
+            echo "用法: ./deploy.sh [--build] [--jar <path>]"
42
+            echo "  --build     本地执行 mvn clean package"
43
+            echo "  --jar       指定要部署的 jar 文件路径"
44
+            exit 0
45
+            ;;
46
+        *)
47
+            echo "未知参数: $1"
48
+            exit 1
49
+            ;;
50
+    esac
51
+done
52
+
53
+# 确定 jar 路径
54
+if [ "$DO_BUILD" = true ]; then
55
+    echo -e "${YELLOW}[deploy] 开始本地构建...${NC}"
56
+    cd "$(dirname "$0")/.."
57
+    mvn clean package -pl iot-platform -am -Dmaven.test.skip=true
58
+    LOCAL_JAR="iot-platform/target/iot-platform.jar"
59
+    echo -e "${GREEN}[deploy] 构建完成: ${LOCAL_JAR}${NC}"
60
+fi
61
+
62
+if [ -z "$LOCAL_JAR" ]; then
63
+    # 默认使用本地 target 下的 jar
64
+    cd "$(dirname "$0")/.."
65
+    LOCAL_JAR="iot-platform/target/iot-platform.jar"
66
+fi
67
+
68
+if [ ! -f "$LOCAL_JAR" ]; then
69
+    echo -e "${RED}[deploy] 错误: 找不到 jar 文件: ${LOCAL_JAR}${NC}"
70
+    exit 1
71
+fi
72
+
73
+JAR_NAME=$(basename "$LOCAL_JAR")
74
+TIMESTAMP=$(date +%Y%m%d-%H%M%S)
75
+BACKUP_NAME="${APP_NAME}-${TIMESTAMP}.jar"
76
+
77
+echo "========================================"
78
+echo "IoT Platform 部署"
79
+echo "========================================"
80
+echo "目标服务器: ${SERVER_USER}@${SERVER_HOST}"
81
+echo "远程目录:   ${REMOTE_DIR}"
82
+echo "本地 JAR:   ${LOCAL_JAR}"
83
+echo "========================================"
84
+
85
+# 步骤 1: 上传 JAR 到临时目录
86
+echo -e "${YELLOW}[1/6] 上传 JAR 到服务器...${NC}"
87
+ssh "${SERVER_USER}@${SERVER_HOST}" "mkdir -p ${REMOTE_DIR}/tmp ${REMOTE_DIR}/backup"
88
+scp "${LOCAL_JAR}" "${SERVER_USER}@${SERVER_HOST}:${REMOTE_DIR}/tmp/${JAR_NAME}"
89
+echo -e "${GREEN}[1/6] 上传完成${NC}"
90
+
91
+# 步骤 2: 备份当前版本
92
+echo -e "${YELLOW}[2/6] 备份当前版本...${NC}"
93
+ssh "${SERVER_USER}@${SERVER_HOST}" "
94
+    if [ -f ${REMOTE_DIR}/${APP_NAME}.jar ]; then
95
+        cp -a ${REMOTE_DIR}/${APP_NAME}.jar ${REMOTE_DIR}/backup/${BACKUP_NAME}
96
+        echo '  已备份: ${BACKUP_NAME}'
97
+        ls -lh ${REMOTE_DIR}/backup/ | tail -5
98
+    else
99
+        echo '  无现有版本,跳过备份'
100
+    fi
101
+"
102
+echo -e "${GREEN}[2/6] 备份完成${NC}"
103
+
104
+# 步骤 3: 停止服务
105
+echo -e "${YELLOW}[3/6] 停止服务...${NC}"
106
+ssh "${SERVER_USER}@${SERVER_HOST}" "systemctl stop ${APP_NAME} || true"
107
+sleep 2
108
+echo -e "${GREEN}[3/6] 服务已停止${NC}"
109
+
110
+# 步骤 4: 替换 JAR
111
+echo -e "${YELLOW}[4/6] 替换 JAR...${NC}"
112
+ssh "${SERVER_USER}@${SERVER_HOST}" "
113
+    cp ${REMOTE_DIR}/tmp/${JAR_NAME} ${REMOTE_DIR}/${APP_NAME}.jar
114
+    chmod 644 ${REMOTE_DIR}/${APP_NAME}.jar
115
+    rm -f ${REMOTE_DIR}/tmp/${JAR_NAME}
116
+    echo '  新 JAR:'
117
+    ls -lh ${REMOTE_DIR}/${APP_NAME}.jar
118
+"
119
+echo -e "${GREEN}[4/6] 替换完成${NC}"
120
+
121
+# 步骤 5: 启动服务
122
+echo -e "${YELLOW}[5/6] 启动服务...${NC}"
123
+ssh "${SERVER_USER}@${SERVER_HOST}" "systemctl start ${APP_NAME}"
124
+sleep 3
125
+
126
+echo -e "${GREEN}[5/6] 服务已启动${NC}"
127
+
128
+# 步骤 6: 健康检查
129
+echo -e "${YELLOW}[6/6] 健康检查...${NC}"
130
+if ssh "${SERVER_USER}@${SERVER_HOST}" "bash ${REMOTE_DIR}/bin/health-check.sh localhost 8887 30"; then
131
+    echo -e "${GREEN}[6/6] 健康检查通过,部署成功!${NC}"
132
+    echo ""
133
+    echo "========================================"
134
+    echo "部署摘要"
135
+    echo "========================================"
136
+    echo "时间:     ${TIMESTAMP}"
137
+    echo "JAR:      ${LOCAL_JAR}"
138
+    echo "备份:     ${BACKUP_NAME}"
139
+    echo "状态:     成功"
140
+    echo ""
141
+    echo "服务状态:"
142
+    ssh "${SERVER_USER}@${SERVER_HOST}" "systemctl status ${APP_NAME} --no-pager"
143
+    exit 0
144
+else
145
+    echo -e "${RED}[6/6] 健康检查失败! 执行回滚...${NC}"
146
+
147
+    # 回滚
148
+    echo -e "${YELLOW}[rollback] 停止服务...${NC}"
149
+    ssh "${SERVER_USER}@${SERVER_HOST}" "systemctl stop ${APP_NAME} || true"
150
+
151
+    if ssh "${SERVER_USER}@${SERVER_HOST}" "test -f ${REMOTE_DIR}/backup/${BACKUP_NAME}"; then
152
+        echo -e "${YELLOW}[rollback] 恢复旧版本...${NC}"
153
+        ssh "${SERVER_USER}@${SERVER_HOST}" "
154
+            cp ${REMOTE_DIR}/backup/${BACKUP_NAME} ${REMOTE_DIR}/${APP_NAME}.jar
155
+            systemctl start ${APP_NAME}
156
+        "
157
+        sleep 3
158
+
159
+        if ssh "${SERVER_USER}@${SERVER_HOST}" "bash ${REMOTE_DIR}/bin/health-check.sh localhost 8887 30"; then
160
+            echo -e "${GREEN}[rollback] 回滚成功,旧版本已恢复${NC}"
161
+        else
162
+            echo -e "${RED}[rollback] 回滚后健康检查仍失败,请手动排查!${NC}"
163
+        fi
164
+    else
165
+        echo -e "${RED}[rollback] 无备份可回滚,请手动修复!${NC}"
166
+    fi
167
+
168
+    exit 1
169
+fi

+ 48
- 0
deploy/health-check.sh Переглянути файл

@@ -0,0 +1,48 @@
1
+#!/bin/bash
2
+# IoT Platform 健康检查脚本
3
+# 用法: ./health-check.sh [host] [port] [max_wait_seconds]
4
+# 返回: 0=健康, 1=不健康
5
+#
6
+# 检查策略:
7
+#   1. 优先检查 /actuator/health (Spring Boot Actuator)
8
+#   2. 如果 Actuator 返回 404,降级为检查 TCP 端口是否监听
9
+#   3. 如果连端口都没监听,则判定为不健康
10
+
11
+HOST=${1:-localhost}
12
+PORT=${2:-8887}
13
+MAX_WAIT=${3:-30}
14
+HEALTH_URL="http://${HOST}:${PORT}/actuator/health"
15
+
16
+INTERVAL=2
17
+ATTEMPTS=$((MAX_WAIT / INTERVAL))
18
+
19
+echo "[health-check] 开始检查: ${HOST}:${PORT}, 最多等待 ${MAX_WAIT} 秒"
20
+
21
+for i in $(seq 1 $ATTEMPTS); do
22
+    # 策略1: Actuator 健康端点
23
+    RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" --connect-timeout 2 --max-time 3 "${HEALTH_URL}" 2>/dev/null)
24
+
25
+    if [ "$RESPONSE" = "200" ]; then
26
+        BODY=$(curl -s --connect-timeout 2 --max-time 3 "${HEALTH_URL}" 2>/dev/null)
27
+        if echo "$BODY" | grep -q '"status":"UP"'; then
28
+            echo "[health-check] Actuator 健康检查通过 (${i}/${ATTEMPTS})"
29
+            exit 0
30
+        fi
31
+    fi
32
+
33
+    # 策略2: 如果 Actuator 404,降级为端口监听检查
34
+    if [ "$RESPONSE" = "404" ]; then
35
+        if nc -z "${HOST}" "${PORT}" 2>/dev/null || \
36
+           ss -tlnp 2>/dev/null | grep -q ":${PORT} " || \
37
+           netstat -tlnp 2>/dev/null | grep -q ":${PORT} "; then
38
+            echo "[health-check] 端口监听检查通过 (${i}/${ATTEMPTS}) [Actuator 未启用,使用端口降级检查]"
39
+            exit 0
40
+        fi
41
+    fi
42
+
43
+    echo "[health-check] 第 ${i}/${ATTEMPTS} 次检查未通过 (HTTP ${RESPONSE}), ${INTERVAL} 秒后重试..."
44
+    sleep $INTERVAL
45
+done
46
+
47
+echo "[health-check] 健康检查失败: 服务未在 ${MAX_WAIT} 秒内就绪"
48
+exit 1

+ 32
- 0
deploy/iot-platform.service Переглянути файл

@@ -0,0 +1,32 @@
1
+[Unit]
2
+Description=IoT Platform Service
3
+After=network.target mysqld.service redis.service
4
+
5
+[Service]
6
+Type=simple
7
+User=root
8
+Group=root
9
+WorkingDirectory=/opt/iot-platform
10
+
11
+# JVM options
12
+Environment="JVM_OPTS=-server -Duser.timezone=Asia/Shanghai -Dfile.encoding=UTF-8 -Xms1g -Xmx2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/iot-platform/logs/heapdump.hprof -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
13
+
14
+# Load environment variables from .env (passwords, credentials)
15
+EnvironmentFile=/opt/iot-platform/.env
16
+
17
+ExecStart=/bin/bash -c 'exec java ${JVM_OPTS} -jar /opt/iot-platform/iot-platform.jar --spring.profiles.include=online'
18
+
19
+ExecStop=/bin/kill -SIGTERM $MAINPID
20
+ExecReload=/bin/kill -SIGUSR1 $MAINPID
21
+
22
+Restart=always
23
+RestartSec=10
24
+StartLimitInterval=60
25
+StartLimitBurst=3
26
+
27
+# Graceful shutdown: wait up to 60s for the app to stop
28
+TimeoutStopSec=60
29
+KillSignal=SIGTERM
30
+
31
+[Install]
32
+WantedBy=multi-user.target

+ 130
- 0
deploy/setup-server.sh Переглянути файл

@@ -0,0 +1,130 @@
1
+#!/bin/bash
2
+# IoT Platform 服务器初始化脚本
3
+# 在 online180 上执行,创建规范目录结构并迁移现有服务到 systemd
4
+
5
+set -e
6
+
7
+APP_NAME="iot-platform"
8
+INSTALL_DIR="/opt/${APP_NAME}"
9
+BACKUP_DIR="${INSTALL_DIR}/backup"
10
+LOG_DIR="${INSTALL_DIR}/logs"
11
+CONFIG_DIR="${INSTALL_DIR}/config"
12
+BIN_DIR="${INSTALL_DIR}/bin"
13
+
14
+OLD_JAR="/home/${APP_NAME}.jar"
15
+OLD_LOG_DIR="/home/logs"
16
+
17
+echo "========================================"
18
+echo "IoT Platform 服务器初始化"
19
+echo "========================================"
20
+
21
+# 1. 创建目录结构
22
+echo "[1/7] 创建目录结构..."
23
+mkdir -p "${INSTALL_DIR}" "${BACKUP_DIR}" "${LOG_DIR}" "${CONFIG_DIR}" "${BIN_DIR}"
24
+
25
+# 2. 迁移 JAR 文件
26
+echo "[2/7] 迁移 JAR 文件..."
27
+if [ -f "${OLD_JAR}" ]; then
28
+    cp -a "${OLD_JAR}" "${INSTALL_DIR}/${APP_NAME}.jar"
29
+    echo "  已复制: ${OLD_JAR} -> ${INSTALL_DIR}/${APP_NAME}.jar"
30
+else
31
+    echo "  警告: 未找到旧 JAR 文件 ${OLD_JAR}"
32
+fi
33
+
34
+# 3. 迁移日志
35
+echo "[3/7] 迁移日志..."
36
+if [ -d "${OLD_LOG_DIR}" ]; then
37
+    # 复制已有日志
38
+    cp -a "${OLD_LOG_DIR}"/* "${LOG_DIR}/" 2>/dev/null || true
39
+    echo "  已复制日志到 ${LOG_DIR}"
40
+    # 可选:备份旧日志目录,创建软链接
41
+    mv "${OLD_LOG_DIR}" "${OLD_LOG_DIR}.bak.$(date +%Y%m%d%H%M%S)" 2>/dev/null || true
42
+    ln -s "${LOG_DIR}" "${OLD_LOG_DIR}" 2>/dev/null || true
43
+    echo "  旧日志目录已备份并创建软链接"
44
+else
45
+    echo "  旧日志目录不存在,跳过迁移"
46
+fi
47
+
48
+# 4. 设置权限
49
+echo "[4/7] 设置权限..."
50
+chmod 755 "${INSTALL_DIR}" "${BACKUP_DIR}" "${LOG_DIR}" "${CONFIG_DIR}" "${BIN_DIR}"
51
+
52
+# 5. 安装 .env 文件(如果当前目录有)
53
+echo "[5/7] 安装环境变量文件..."
54
+if [ -f "/tmp/iot-platform.env" ]; then
55
+    cp "/tmp/iot-platform.env" "${INSTALL_DIR}/.env"
56
+    chmod 600 "${INSTALL_DIR}/.env"
57
+    echo "  已安装 .env"
58
+else
59
+    echo "  警告: 未找到 /tmp/iot-platform.env,请手动创建 ${INSTALL_DIR}/.env"
60
+fi
61
+
62
+# 6. 安装 systemd 服务
63
+echo "[6/7] 安装 systemd 服务..."
64
+if [ -f "/tmp/iot-platform.service" ]; then
65
+    cp "/tmp/iot-platform.service" /etc/systemd/system/iot-platform.service
66
+    chmod 644 /etc/systemd/system/iot-platform.service
67
+    systemctl daemon-reload
68
+    systemctl enable iot-platform
69
+    echo "  systemd 服务已安装并启用"
70
+else
71
+    echo "  错误: 未找到 /tmp/iot-platform.service"
72
+    exit 1
73
+fi
74
+
75
+# 7. 平滑切换:停止旧进程,启动 systemd
76
+echo "[7/7] 平滑切换到 systemd..."
77
+OLD_PID=$(ps -ef | grep java | grep "${APP_NAME}.jar" | grep -v grep | grep -v systemd | awk '{print $2}' | head -1)
78
+if [ -n "${OLD_PID}" ]; then
79
+    echo "  发现旧进程 PID=${OLD_PID}, 发送 SIGTERM..."
80
+    kill -TERM "${OLD_PID}" 2>/dev/null || true
81
+
82
+    # 等待最多 30 秒
83
+    for i in $(seq 1 30); do
84
+        if ! kill -0 "${OLD_PID}" 2>/dev/null; then
85
+            echo "  旧进程已停止"
86
+            break
87
+        fi
88
+        sleep 1
89
+    done
90
+
91
+    # 强制结束
92
+    if kill -0 "${OLD_PID}" 2>/dev/null; then
93
+        echo "  旧进程未响应,发送 SIGKILL..."
94
+        kill -KILL "${OLD_PID}" 2>/dev/null || true
95
+    fi
96
+else
97
+    echo "  未发现旧进程"
98
+fi
99
+
100
+echo ""
101
+echo "启动 systemd 服务..."
102
+systemctl start iot-platform
103
+
104
+sleep 3
105
+
106
+# 检查服务状态
107
+if systemctl is-active --quiet iot-platform; then
108
+    echo ""
109
+    echo "========================================"
110
+    echo "初始化成功!"
111
+    echo "========================================"
112
+    echo "服务状态:"
113
+    systemctl status iot-platform --no-pager
114
+    echo ""
115
+    echo "目录结构:"
116
+    ls -la "${INSTALL_DIR}"
117
+    echo ""
118
+    echo "常用命令:"
119
+    echo "  systemctl status iot-platform   # 查看状态"
120
+    echo "  journalctl -u iot-platform -f   # 实时日志"
121
+    echo "  systemctl stop iot-platform     # 停止服务"
122
+    echo "  systemctl restart iot-platform  # 重启服务"
123
+else
124
+    echo ""
125
+    echo "========================================"
126
+    echo "警告: 服务启动可能失败,请检查日志"
127
+    echo "========================================"
128
+    journalctl -u iot-platform --no-pager -n 50
129
+    exit 1
130
+fi

+ 22
- 0
iot-platform/.env.example Переглянути файл

@@ -0,0 +1,22 @@
1
+# IoT 平台环境变量配置示例
2
+# 生产环境部署前必须设置以下环境变量
3
+
4
+# MySQL 数据库(主从共用)
5
+export MYSQL_USERNAME=root
6
+export MYSQL_PASSWORD=your_mysql_password
7
+
8
+# Redis
9
+export REDIS_PASSWORD=your_redis_password
10
+
11
+# MQTT Broker
12
+export MQTT_USERNAME=your_mqtt_username
13
+export MQTT_PASSWORD=your_mqtt_password
14
+
15
+# TDengine 时序数据库
16
+export TDENGINE_USERNAME=root
17
+export TDENGINE_PASSWORD=your_tdengine_password
18
+
19
+# Druid 监控(可选,生产环境建议关闭)
20
+export DRUID_STAT_ENABLED=false
21
+export DRUID_USERNAME=admin
22
+export DRUID_PASSWORD=your_druid_password

+ 6
- 0
iot-platform/pom.xml Переглянути файл

@@ -125,6 +125,12 @@
125 125
             <optional>true</optional>
126 126
         </dependency>
127 127
 
128
+        <!-- Spring Boot Actuator -->
129
+        <dependency>
130
+            <groupId>org.springframework.boot</groupId>
131
+            <artifactId>spring-boot-starter-actuator</artifactId>
132
+        </dependency>
133
+
128 134
         <!-- 测试 -->
129 135
         <dependency>
130 136
             <groupId>org.springframework.boot</groupId>

+ 47
- 0
iot-platform/src/main/java/com/iot/platform/common/RedisKeys.java Переглянути файл

@@ -0,0 +1,47 @@
1
+package com.iot.platform.common;
2
+
3
+/**
4
+ * Redis key 命名空间常量,统一前缀避免 key 冲突。
5
+ * 格式: iot:{domain}:{identifier}
6
+ */
7
+public final class RedisKeys {
8
+
9
+    private static final String PREFIX = "iot";
10
+
11
+    /** 活跃设备集合 */
12
+    public static final String ACTIVE_DEVICES = PREFIX + ":dsb:active:devices";
13
+
14
+    /** 设备遥测数据 hash */
15
+    public static String deviceTelemetry(String controllerId, String metricName) {
16
+        return PREFIX + ":dsb:" + controllerId + ":" + metricName;
17
+    }
18
+
19
+    /** 工单坐标 hash */
20
+    public static String workorderCoordinate(String controllerId) {
21
+        return PREFIX + ":workorder:coordinate:" + controllerId;
22
+    }
23
+
24
+    /** 控制器状态 hash */
25
+    public static String controllerStatus(String controllerId) {
26
+        return PREFIX + ":controller:" + controllerId + ":status";
27
+    }
28
+
29
+    /** 控制器 topic hash */
30
+    public static String controllerTopic(String controllerId, String topicName) {
31
+        return PREFIX + ":controller:" + controllerId + ":" + topicName;
32
+    }
33
+
34
+    /** 控制器 cmd topic hash */
35
+    public static String controllerCmdTopic(String controllerId, String topicName) {
36
+        return PREFIX + ":controller:" + controllerId + "_cmd:" + topicName;
37
+    }
38
+
39
+    /** 控制器 fault topic hash */
40
+    public static String controllerFaultTopic(String controllerId, String topicName) {
41
+        return PREFIX + ":controller:" + controllerId + "_fault:" + topicName;
42
+    }
43
+
44
+    private RedisKeys() {
45
+        // 工具类,禁止实例化
46
+    }
47
+}

+ 1
- 0
iot-platform/src/main/java/com/iot/platform/mapper/SysrealtimeMapper.java Переглянути файл

@@ -12,4 +12,5 @@ public interface SysrealtimeMapper {
12 12
     void inserttables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("deviceId")String deviceId,@Param("timestamp")String timestamp,@Param("k")String k,@Param("v")Object v);
13 13
     void updatetables(@Param("tableName")String tableName,@Param("createTime")String createTime,@Param("v")Object v,@Param("timestamp")String timestamp,@Param("k")String k,@Param("deviceId")String deviceId);
14 14
     Integer selectkey(@Param("tableName")String tableName,@Param("k")String k);
15
+    List<String> selectAllKeys(@Param("tableName")String tableName);
15 16
 }

+ 177
- 0
iot-platform/src/main/java/com/iot/platform/mqtt/AbstractMqttConsumer.java Переглянути файл

@@ -0,0 +1,177 @@
1
+package com.iot.platform.mqtt;
2
+
3
+import com.iot.platform.config.IotProperties;
4
+import org.eclipse.paho.client.mqttv3.*;
5
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
6
+import org.slf4j.Logger;
7
+import org.slf4j.LoggerFactory;
8
+import org.springframework.beans.factory.annotation.Autowired;
9
+
10
+import javax.annotation.PostConstruct;
11
+import javax.annotation.PreDestroy;
12
+import java.net.InetSocketAddress;
13
+import java.net.Socket;
14
+import java.util.concurrent.ExecutorService;
15
+import java.util.concurrent.Executors;
16
+
17
+public abstract class AbstractMqttConsumer {
18
+
19
+    protected final Logger log = LoggerFactory.getLogger(getClass());
20
+
21
+    @Autowired
22
+    protected IotProperties iotProperties;
23
+
24
+    private String brokerUrl;
25
+    private String brokerHost;
26
+    private int brokerPort;
27
+    private String mqttUsername;
28
+    private String mqttPassword;
29
+
30
+    private static final int QOS = 1;
31
+    private static final int CONNECT_TIMEOUT = 3000;
32
+    private static final int RECONNECT_INTERVAL = 5000;
33
+    private static final int MAX_AVAILABILITY_CHECKS = 60;
34
+
35
+    protected MqttClient mqttClient;
36
+    protected MqttConnectOptions connOpts;
37
+    protected volatile boolean isMqttConnected = false;
38
+    protected final ExecutorService executorService = Executors.newSingleThreadExecutor();
39
+
40
+    protected abstract String getSubscribeTopic();
41
+
42
+    protected abstract String generateClientId();
43
+
44
+    protected abstract void handleMessage(String topic, String messageContent) throws Exception;
45
+
46
+    protected void onDestroy() {
47
+        // 子类可覆盖,用于关闭自定义资源
48
+    }
49
+
50
+    @PostConstruct
51
+    public void connectAndSubscribe() {
52
+        this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
53
+        String brokerAddr = this.brokerUrl.replace("tcp://", "");
54
+        int colonIdx = brokerAddr.lastIndexOf(':');
55
+        this.brokerHost = brokerAddr.substring(0, colonIdx);
56
+        this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
57
+        this.mqttUsername = iotProperties.getMqtt().getUsername();
58
+        this.mqttPassword = iotProperties.getMqtt().getPassword();
59
+
60
+        try {
61
+            checkServerAvailability();
62
+            String clientId = generateClientId();
63
+            mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
64
+            initMqttConnectOptions();
65
+            setMqttCallback();
66
+            connectAndSubscribeTopic();
67
+        } catch (MqttException | InterruptedException e) {
68
+            log.error("MQTT客户端初始化失败:", e);
69
+        }
70
+    }
71
+
72
+    private void checkServerAvailability() throws InterruptedException {
73
+        boolean serverAvailable = false;
74
+        int attempts = 0;
75
+        while (!serverAvailable && attempts < MAX_AVAILABILITY_CHECKS) {
76
+            try (Socket socket = new Socket()) {
77
+                socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
78
+                serverAvailable = true;
79
+                log.info("MQTT服务器连通性检测通过");
80
+            } catch (Exception e) {
81
+                attempts++;
82
+                log.warn("MQTT服务器不可达,5秒后重试... (attempt {}/{}})", attempts, MAX_AVAILABILITY_CHECKS);
83
+                Thread.sleep(RECONNECT_INTERVAL);
84
+            }
85
+        }
86
+        if (!serverAvailable) {
87
+            throw new IllegalStateException(
88
+                "MQTT服务器在" + MAX_AVAILABILITY_CHECKS + "次尝试后仍不可达: " + brokerHost + ":" + brokerPort);
89
+        }
90
+    }
91
+
92
+    private void initMqttConnectOptions() {
93
+        connOpts = new MqttConnectOptions();
94
+        connOpts.setCleanSession(true);
95
+        connOpts.setAutomaticReconnect(true);
96
+        connOpts.setConnectionTimeout(10);
97
+        connOpts.setUserName(mqttUsername);
98
+        connOpts.setPassword(mqttPassword.toCharArray());
99
+    }
100
+
101
+    private void setMqttCallback() {
102
+        mqttClient.setCallback(new MqttCallback() {
103
+            @Override
104
+            public void connectionLost(Throwable cause) {
105
+                log.error("MQTT连接断开,开始重连:" + cause.getMessage());
106
+                isMqttConnected = false;
107
+                reconnect();
108
+            }
109
+
110
+            @Override
111
+            public void messageArrived(String topic, MqttMessage message) throws Exception {
112
+                if (isMqttConnected) {
113
+                    executorService.submit(() -> {
114
+                        try {
115
+                            String messageContent = new String(message.getPayload(), "UTF-8");
116
+                            handleMessage(topic, messageContent);
117
+                        } catch (Exception e) {
118
+                            log.error("消息处理失败:", e);
119
+                        }
120
+                    });
121
+                }
122
+            }
123
+
124
+            @Override
125
+            public void deliveryComplete(IMqttDeliveryToken token) {
126
+                // 无需处理
127
+            }
128
+        });
129
+    }
130
+
131
+    private void connectAndSubscribeTopic() throws MqttException {
132
+        if (!mqttClient.isConnected()) {
133
+            IMqttToken connectToken = mqttClient.connectWithResult(connOpts);
134
+            if (connectToken.isComplete()) {
135
+                mqttClient.subscribe(getSubscribeTopic(), QOS);
136
+                isMqttConnected = true;
137
+                log.info("MQTT连接成功,已订阅主题:" + getSubscribeTopic());
138
+            }
139
+        }
140
+    }
141
+
142
+    public void reconnect() {
143
+        int maxReconnectAttempts = 3;
144
+        for (int attempt = 1; attempt <= maxReconnectAttempts; attempt++) {
145
+            try {
146
+                Thread.sleep(RECONNECT_INTERVAL);
147
+                if (mqttClient != null && !mqttClient.isConnected()) {
148
+                    mqttClient.connect(connOpts);
149
+                    mqttClient.subscribe(getSubscribeTopic(), QOS);
150
+                    isMqttConnected = true;
151
+                    log.info("MQTT重连成功(第" + attempt + "次尝试)");
152
+                    break;
153
+                }
154
+            } catch (MqttException | InterruptedException e) {
155
+                log.error("MQTT重连失败(第" + attempt + "次尝试):" + e.getMessage());
156
+                if (attempt == maxReconnectAttempts) {
157
+                    log.error("已达最大重连次数,停止重连");
158
+                }
159
+            }
160
+        }
161
+    }
162
+
163
+    @PreDestroy
164
+    public void disconnect() {
165
+        try {
166
+            if (mqttClient != null && mqttClient.isConnected()) {
167
+                mqttClient.disconnect();
168
+                mqttClient.close();
169
+                log.info("MQTT连接已断开");
170
+            }
171
+            executorService.shutdown();
172
+            onDestroy();
173
+        } catch (MqttException e) {
174
+            log.error("MQTT断开连接失败:", e);
175
+        }
176
+    }
177
+}

+ 55
- 277
iot-platform/src/main/java/com/iot/platform/mqtt/MqttFaultConsumer.java Переглянути файл

@@ -1,30 +1,14 @@
1 1
 package com.iot.platform.mqtt;
2
+
2 3
 import com.fasterxml.jackson.databind.ObjectMapper;
3
-import com.iot.platform.common.AjaxResult;
4
-import com.iot.platform.domain.SysController;
5 4
 import com.iot.platform.domain.SysDevice;
6 5
 import com.iot.platform.domain.SysFault;
7 6
 import com.iot.platform.service.*;
8 7
 import com.iot.platform.common.utils.NumericIdGenerator;
9
-import org.eclipse.paho.client.mqttv3.*;
10
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
11
-import com.iot.platform.config.IotProperties;
12 8
 import org.springframework.beans.factory.annotation.Autowired;
13
-import org.springframework.core.io.ClassPathResource;
14
-import org.springframework.scheduling.annotation.Async;
15 9
 import org.springframework.stereotype.Component;
16
-import org.springframework.web.bind.annotation.PostMapping;
17 10
 import org.springframework.web.client.RestTemplate;
18 11
 
19
-import javax.annotation.PostConstruct;
20
-import javax.annotation.PreDestroy;
21
-import javax.net.ssl.*;
22
-import java.io.InputStream;
23
-import java.net.InetSocketAddress;
24
-import java.net.Socket;
25
-import java.security.KeyStore;
26
-import java.security.cert.CertificateFactory;
27
-import java.security.cert.X509Certificate;
28 12
 import java.sql.SQLException;
29 13
 import java.time.LocalDate;
30 14
 import java.time.LocalDateTime;
@@ -32,293 +16,101 @@ import java.time.format.DateTimeFormatter;
32 16
 import java.util.*;
33 17
 import java.util.concurrent.ExecutorService;
34 18
 import java.util.concurrent.Executors;
35
-import org.slf4j.Logger;
36
-import org.slf4j.LoggerFactory;
37 19
 
38 20
 /**
39 21
  * 添加告警信息
40 22
  */
41 23
 @Component
42
-public class MqttFaultConsumer {
24
+public class MqttFaultConsumer extends AbstractMqttConsumer {
43 25
 
44
-    private static final Logger log = LoggerFactory.getLogger(MqttFaultConsumer.class);
45
-    private static ExecutorService threadPool= Executors.newCachedThreadPool();
46 26
     @Autowired
47 27
     public SysControllerService sysControllerService;
48 28
     @Autowired
49
-    public MqttDynamicConsumer messageListenerService2;
50
-    @Autowired
51 29
     public SysFaultService sysFaultService;
52 30
     @Autowired
53 31
     public SysrealtimeService sysrealtimeService;
54
-
55 32
     @Autowired
56
-    private IotProperties iotProperties;
57
-
58
-    // ========== 移除SSL,改用普通TCP连接 ==========
59
-    private String brokerUrl;
60
-    private String brokerHost;
61
-    private int brokerPort;
62
-
63
-    private static final int QOS = 1;
64
-    // 主题(保留原有主题)
65
-    private static final String SUBSCRIBE_TOPIC = "+/fault_prot";
66
-    private static final int CONNECT_TIMEOUT = 3000;
67
-    // 重试时间
68
-    private static final int RECONNECT_INTERVAL = 5000;
69
-
70
-    private String mqttUsername;
71
-    private String mqttPassword;
72
-    private static final ExecutorService mysqlWritePool = Executors.newFixedThreadPool(5); // 根据实际情况调整核心线程数
73
-
74
-    // 移除:SSL相关的CA_CERT_PATH常量(无需证书配置)
75
-    // 你的其他成员变量保持不变
33
+    public SysWorkorderService sysWorkorderService;
34
+    @Autowired
35
+    public SysAlarmService sysAlarmService;
76 36
     @Autowired
77
-    public TDengineService tDengineMapceshi2;
37
+    public NumericIdGenerator numericIdGenerator;
38
+    @Autowired
39
+    public TDegnineAlarm tDegnineAlarm;
78 40
     @Autowired
79 41
     private RestTemplate restTemplate;
80 42
 
43
+    private final ObjectMapper objectMapper = new ObjectMapper();
44
+    private static final ExecutorService mysqlWritePool = Executors.newFixedThreadPool(5);
81 45
 
82
-    private MqttClient mqttClient;
83
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
84
-    private boolean isMqttConnected = false;
85
-    private MqttConnectOptions connOpts;
86
-    // 注意:移除未定义的threadPool引用(避免编译错误)
87
-
88
-    @PostConstruct
89
-    public void connectAndSubscribe() {
90
-        this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
91
-        String brokerAddr = this.brokerUrl.replace("tcp://", "");
92
-        int colonIdx = brokerAddr.lastIndexOf(':');
93
-        this.brokerHost = brokerAddr.substring(0, colonIdx);
94
-        this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
95
-        this.mqttUsername = iotProperties.getMqtt().getUsername();
96
-        this.mqttPassword = iotProperties.getMqtt().getPassword();
97
-
98
-        try {
99
-            // 1. 检测普通MQTT服务器连通性(移除SSL相关,保留基础TCP检测)
100
-            checkServerAvailability();
101
-
102
-            // 2. 创建MQTT客户端(启用普通TCP的brokerUrl,修复原代码注释掉的创建逻辑)
103
-            String clientId = generateClientIdByOs();
104
-            mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); // 取消注释并修正
105
-
106
-            // 3. 初始化MQTT连接选项(移除SSL配置,新增账号密码)
107
-            initMqttConnectOptions();
108
-
109
-            // 4. 设置MQTT回调(保持核心逻辑,修复消息解析bug)
110
-            setMqttCallback();
111
-
112
-            // 5. 建立连接并订阅主题(保持不变)
113
-            connectAndSubscribeTopic();
114
-
115
-        } catch (MqttException | InterruptedException e) {
116
-            log.error("MQTT客户端初始化失败:", e);
117
-        }
118
-    }
119
-
120
-    /**
121
-     * 检测普通MQTT服务器连通性(移除SSL,仅检测TCP端口)
122
-     */
123
-    private void checkServerAvailability() throws InterruptedException {
124
-        boolean serverAvailable = false;
125
-        while (!serverAvailable) {
126
-            try (Socket socket = new Socket()) {
127
-                socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
128
-                serverAvailable = true;
129
-                log.info("普通MQTT服务器连通性检测通过");
130
-            } catch (Exception e) {
131
-                log.error("普通MQTT服务器不可达,5秒后重试...");
132
-                Thread.sleep(RECONNECT_INTERVAL);
133
-            }
134
-        }
46
+    private static final Map<String, String> KEY_MAPPING = new HashMap<>();
47
+    static {
48
+        KEY_MAPPING.put("timestamp", "devicetimestamp");
49
+        KEY_MAPPING.put("type", "devicetype");
50
+        KEY_MAPPING.put("desc", "devicedesc");
135 51
     }
136 52
 
137
-    /**
138
-     * 初始化MQTT连接选项(移除SSL配置,新增账号密码认证)
139
-     */
140
-    private void initMqttConnectOptions() {
141
-        connOpts = new MqttConnectOptions();
142
-        connOpts.setCleanSession(true);
143
-        connOpts.setAutomaticReconnect(true);
144
-        connOpts.setConnectionTimeout(10);
145
-
146
-        // ========== 新增:配置MQTT账号密码 ==========
147
-        connOpts.setUserName(mqttUsername);
148
-        connOpts.setPassword(mqttPassword.toCharArray()); // 密码要求传入char数组
149
-
150
-        // 移除:原有的SSL配置方法调用(configureSslAndStrictHostnameVerify())
53
+    @Override
54
+    protected String getSubscribeTopic() {
55
+        return "+/fault_prot";
151 56
     }
152 57
 
153
-    /**
154
-     * 按操作系统生成唯一ClientId(保持不变)
155
-     */
156
-    private String generateClientIdByOs() {
58
+    @Override
59
+    protected String generateClientId() {
157 60
         String osName = System.getProperty("os.name").toLowerCase();
158 61
         return osName.contains("windows") ? "mqttx_e216fbf1620" : "mqttx_e216fbf1621";
159 62
     }
160 63
 
161
-    /**
162
-     * 设置MQTT回调函数(保留核心逻辑,修复2个关键bug)
163
-     */
164
-    private void setMqttCallback() {
165
-        mqttClient.setCallback(new MqttCallback() {
166
-            @Override
167
-            public void connectionLost(Throwable cause) {
168
-                log.error("MQTT连接断开,开始重连:" + cause.getMessage());
169
-                isMqttConnected = false;
170
-                reconnect();
171
-            }
172
-
173
-            @Override
174
-            public void messageArrived(String topic, MqttMessage message) {
175
-                if (isMqttConnected) {
176
-                    executorService.submit(() -> {
177
-                        try {
178
-                            ObjectMapper objectMapper = new ObjectMapper();
179
-                            // 修复bug1:原代码先读取messageContent,却误用mqtt(message.toString())解析,改为使用messageContent
180
-                            // 正确读取消息负载(UTF-8编码,避免乱码)
181
-                            String messageContent = new String(message.getPayload(), "UTF-8");
182
-                            Map<String, Object> messageMap = objectMapper.readValue(messageContent, Map.class);
183
-                            insertTDegine(messageMap, topic);
184
-                            // 修复bug2:用正确的messageContent解析为SysFault对象
185
-                            SysFault sysFault = objectMapper.readValue(messageContent, SysFault.class);
186
-                            // 业务处理
187
-                            mysqlWritePool.submit(() -> {triggermethod(topic,sysFault);});
188
-                        } catch (Exception e) {
189
-                            log.error("消息处理失败:", e);
190
-                        }
191
-                    });
192
-                }
193
-            }
194
-
195
-            @Override
196
-            public void deliveryComplete(IMqttDeliveryToken token) {
197
-                // 消息投递完成回调(无需处理)
198
-            }
199
-        });
64
+    @Override
65
+    protected void handleMessage(String topic, String messageContent) throws Exception {
66
+        Map<String, Object> messageMap = objectMapper.readValue(messageContent, Map.class);
67
+        insertTDegine(messageMap, topic);
68
+        SysFault sysFault = objectMapper.readValue(messageContent, SysFault.class);
69
+        mysqlWritePool.submit(() -> triggermethod(topic, sysFault));
200 70
     }
201 71
 
202
-    /**
203
-     * 建立MQTT连接并订阅主题(保持不变)
204
-     */
205
-    private void connectAndSubscribeTopic() throws MqttException {
206
-        if (!mqttClient.isConnected()) {
207
-            IMqttToken connectToken = mqttClient.connectWithResult(connOpts);
208
-            if (connectToken.isComplete()) {
209
-                mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
210
-                isMqttConnected = true;
211
-                log.info("MQTT连接成功,已订阅主题:" + SUBSCRIBE_TOPIC);
212
-            }
213
-        }
214
-    }
215
-
216
-    /**
217
-     * MQTT重连逻辑(移除SSL日志标识,保持功能不变)
218
-     */
219
-    public void reconnect() {
220
-        int maxReconnectAttempts = 3;
221
-        for (int attempt = 1; attempt <= maxReconnectAttempts; attempt++) {
222
-            try {
223
-                Thread.sleep(RECONNECT_INTERVAL);
224
-                if (mqttClient != null && !mqttClient.isConnected()) {
225
-                    mqttClient.connect(connOpts);
226
-                    mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
227
-                    isMqttConnected = true;
228
-                    log.info("MQTT重连成功(第" + attempt + "次尝试)");
229
-                    break; // 重连成功后退出循环
230
-                }
231
-            } catch (MqttException | InterruptedException e) {
232
-                log.error("MQTT重连失败(第" + attempt + "次尝试):" + e.getMessage());
233
-                if (attempt == maxReconnectAttempts) {
234
-                    log.error("已达最大重连次数,停止重连");
235
-                }
236
-            }
237
-        }
238
-    }
239
-
240
-    /**
241
-     * 销毁时断开MQTT连接,关闭线程池(移除未定义的threadPool.shutdown())
242
-     */
243
-    @PreDestroy
244
-    public void disconnect() {
245
-        try {
246
-            if (mqttClient != null && mqttClient.isConnected()) {
247
-                mqttClient.disconnect();
248
-                mqttClient.close();
249
-                log.info("MQTT连接已断开");
250
-            }
251
-            executorService.shutdown();
252
-            // 移除:原代码中未定义的threadPool.shutdown();(避免编译错误)
253
-        } catch (MqttException e) {
254
-            log.error("MQTT断开连接失败:", e);
255
-        }
256
-    }
257
-
258
-    @Autowired
259
-    public SysWorkorderService sysWorkorderService;
260
-    @Autowired
261
-    public SysAlarmService sysAlarmService;
262
-    @Autowired
263
-    public NumericIdGenerator numericIdGenerator;
264
-    @Autowired
265
-    public TDegnineAlarm tDegnineAlarm;
266
-    private static final Map<String, String> KEY_MAPPING = new HashMap<>();
267
-    static {
268
-        KEY_MAPPING.put("timestamp", "devicetimestamp");
269
-        KEY_MAPPING.put("type", "devicetype");
270
-        KEY_MAPPING.put("desc", "devicedesc");
72
+    @Override
73
+    protected void onDestroy() {
74
+        mysqlWritePool.shutdown();
271 75
     }
272 76
 
273
-
274 77
     public void insertTDegine(Map<String, Object> weather, String topic) throws SQLException {
275 78
         LocalDate localDate = LocalDate.now();
276 79
         int year = localDate.getYear();
277
-        int month = localDate.getMonthValue(); // 直接获取1 - 12的月份,无需加1
278
-        // 超级表名称
80
+        int month = localDate.getMonthValue();
279 81
         String supertablename = topic.split("/")[0];
280
-        // 普通表名称
281
-        String table =topic.split("/")[0] + "_" + year + month;
282
-        // 创建新Map存储结果,避免修改原Map和并发修改异常
283
-        Map<String, Object> newMap = new HashMap<>();
82
+        String table = topic.split("/")[0] + "_" + year + month;
284 83
 
285
-        // 遍历原始Map,替换指定键名
84
+        Map<String, Object> newMap = new HashMap<>();
286 85
         for (Map.Entry<String, Object> entry : weather.entrySet()) {
287 86
             String originalKey = entry.getKey();
288 87
             Object value = entry.getValue();
289
-            // 判断是否需要替换键名
290 88
             if (KEY_MAPPING.containsKey(originalKey)) {
291
-                // 使用映射后的新键名
292 89
                 newMap.put(KEY_MAPPING.get(originalKey), value);
293 90
             } else {
294
-                // 不需要替换的键名,直接保留
295 91
                 newMap.put(originalKey, value);
296 92
             }
297 93
         }
298 94
         tDegnineAlarm.shibaihou(newMap, supertablename, table, topic.split("/")[1]);
299 95
     }
300 96
 
301
-
302
-    public void triggermethod(String topic,SysFault weather){
303
-        String deviceId=weather.getDevice_id();
304
-        String timestamp=weather.getTimestamp();
305
-        String type=weather.getType();
306
-        Integer code=weather.getCode();
307
-        String desc=weather.getDesc();
308
-
97
+    public void triggermethod(String topic, SysFault weather) {
98
+        String deviceId = weather.getDevice_id();
99
+        String timestamp = weather.getTimestamp();
100
+        String type = weather.getType();
101
+        String desc = weather.getDesc();
309 102
 
310 103
         LocalDate localDate = LocalDate.now();
311 104
         int year = localDate.getYear();
312
-        int month = localDate.getMonthValue(); // 直接获取1 - 12的月份,无需加1
105
+        int month = localDate.getMonthValue();
313 106
         String formattedMonth = String.format("%02d", month);
314 107
 
315
-        String controllerId=weather.getController_id();
316
-        String[] topics=topic.split("/");
108
+        String controllerId = weather.getController_id();
109
+        String[] topics = topic.split("/");
317 110
 
318
-        //判读是否有表的存在,如果有就不用管,如果没有就创建这个表(采用按照月去进行分表的操作)
319 111
         List<String> tablename = sysrealtimeService.selecttables();
320 112
         List<Boolean> a = new ArrayList<>();
321
-        String controllername=controllerId+year+formattedMonth+"_fault";
113
+        String controllername = controllerId + year + formattedMonth + "_fault";
322 114
 
323 115
         for (int i = 0; i < tablename.size(); i++) {
324 116
             a.add(tablename.get(i).equals(controllername));
@@ -327,41 +119,27 @@ public class MqttFaultConsumer {
327 119
             sysFaultService.createmessage(controllername);
328 120
         }
329 121
 
330
-        /**
331
-         * 修改消息发送修改为只要绑定这个设备或者说是控制器权限的人都可以收到这个故障的信息
332
-         * (或者说是将已读的状态修改为当前人的信息)
333
-         */
334
-        //记录当前经纬度(查询数据库信息)
335
-        if(type.equals("触发")){
336
-            //记录触发经纬度
337
-            SysDevice jingdu=sysControllerService.selectjingweidu(topics[0],"经度");
338
-            SysDevice weidu=sysControllerService.selectjingweidu(topics[0],"纬度");
339
-            //随机生成消息id
340
-            String companyid="GJ"+numericIdGenerator.nextId();
341
-            //当前时间
122
+        if ("触发".equals(type)) {
123
+            SysDevice jingdu = sysControllerService.selectjingweidu(topics[0], "经度");
124
+            SysDevice weidu = sysControllerService.selectjingweidu(topics[0], "纬度");
125
+            String companyid = "GJ" + numericIdGenerator.nextId();
342 126
             LocalDateTime currentTime = LocalDateTime.now();
343 127
             DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
344 128
             String currentTimeStr = currentTime.format(formatter);
345
-            sysAlarmService.insertalarm(controllername,companyid,desc,"0",currentTimeStr,"0",controllerId,deviceId,jingdu.getV(),weidu.getV());
346
-            //将信息添加到主库中
347
-            sysFaultService.insertfault(companyid,desc,"0",currentTimeStr,"0",controllerId,deviceId,jingdu.getV(),weidu.getV(),"");
348
-            //获取当前驾驶人
349
-            //sysWorkorderService.selectdeviceId(topics[0]);
350
-        }else if (type.equals("恢复")){
351
-            //记录触发经纬度
352
-            SysDevice jingdu=sysControllerService.selectjingweidu(topics[0],"经度");
353
-            SysDevice weidu=sysControllerService.selectjingweidu(topics[0],"纬度");
354
-            //随机生成消息id
355
-            String companyid="GJ"+numericIdGenerator.nextId();
356
-            //当前时间
129
+            sysAlarmService.insertalarm(controllername, companyid, desc, "0", currentTimeStr, "0", controllerId, deviceId, jingdu.getV(), weidu.getV());
130
+            sysFaultService.insertfault(companyid, desc, "0", currentTimeStr, "0", controllerId, deviceId, jingdu.getV(), weidu.getV(), "");
131
+        } else if ("恢复".equals(type)) {
132
+            SysDevice jingdu = sysControllerService.selectjingweidu(topics[0], "经度");
133
+            SysDevice weidu = sysControllerService.selectjingweidu(topics[0], "纬度");
134
+            String companyid = "GJ" + numericIdGenerator.nextId();
357 135
             LocalDateTime currentTime = LocalDateTime.now();
358 136
             DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
359 137
             String currentTimeStr = currentTime.format(formatter);
360
-            sysAlarmService.insertalarm(controllername,companyid,desc,"1",currentTimeStr,"0",controllerId,deviceId,jingdu.getV(),weidu.getV());
361
-            sysFaultService.updatefault("1","0",jingdu.getV(),weidu.getV(),desc,controllerId,deviceId,currentTimeStr);
362
-
138
+            sysAlarmService.insertalarm(controllername, companyid, desc, "1", currentTimeStr, "0", controllerId, deviceId, jingdu.getV(), weidu.getV());
139
+            sysFaultService.updatefault("1", "0", jingdu.getV(), weidu.getV(), desc, controllerId, deviceId, currentTimeStr);
363 140
         }
141
+
364 142
         String url = "https://esos-iot.com:9443/syscar/gaojing?controllerId=" + topics[0];
365 143
         restTemplate.postForObject(url, null, String.class);
366 144
     }
367
-}
145
+}

+ 27
- 227
iot-platform/src/main/java/com/iot/platform/mqtt/MqttGenericConsumer.java Переглянути файл

@@ -1,45 +1,21 @@
1 1
 package com.iot.platform.mqtt;
2
+
2 3
 import com.fasterxml.jackson.databind.ObjectMapper;
3 4
 import com.iot.platform.domain.ControllerData;
4 5
 import com.iot.platform.domain.topics;
5 6
 import com.iot.platform.service.SysControllerService;
6
-import com.iot.platform.service.TDengineService;
7
-import org.eclipse.paho.client.mqttv3.*;
8
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
9
-import com.iot.platform.config.IotProperties;
10 7
 import org.springframework.beans.factory.annotation.Autowired;
11
-import javax.annotation.PostConstruct;
12
-import javax.annotation.PreDestroy;
13
-import javax.net.ssl.*;
14
-import java.io.InputStream;
15
-import java.net.InetSocketAddress;
16
-import java.net.Socket;
17
-import java.security.KeyStore;
18
-import java.security.cert.CertificateFactory;
19
-import java.security.cert.X509Certificate;
20
-import java.util.*;
21
-import java.util.concurrent.ExecutorService;
22
-import java.util.concurrent.Executors;
23
-import org.springframework.core.io.ClassPathResource;
24 8
 import org.springframework.data.redis.core.StringRedisTemplate;
25
-import org.springframework.scheduling.annotation.Async;
26 9
 import org.springframework.stereotype.Component;
27
-import org.slf4j.Logger;
28
-import org.slf4j.LoggerFactory;
10
+
11
+import java.util.List;
29 12
 
30 13
 /**
31 14
  * 存储控制器数据
32 15
  */
33 16
 @Component
34
-public class MqttGenericConsumer {
17
+public class MqttGenericConsumer extends AbstractMqttConsumer {
35 18
 
36
-    private static final Logger log = LoggerFactory.getLogger(MqttGenericConsumer.class);
37
-    private static ExecutorService threadPool= Executors.newCachedThreadPool();
38
-    /**
39
-     *
40
-     * 添加mysql数据
41
-     *
42
-     */
43 19
     @Autowired
44 20
     private StringRedisTemplate stringRedisTemplate;
45 21
     @Autowired
@@ -47,197 +23,23 @@ public class MqttGenericConsumer {
47 23
     @Autowired
48 24
     public MqttDynamicConsumer messageListenerService2;
49 25
 
50
-    @Autowired
51
-    private IotProperties iotProperties;
52
-
53
-    // ========== 移除SSL,改用普通TCP连接 ==========
54
-    private String brokerUrl;
55
-    private String brokerHost;
56
-    private int brokerPort;
57
-
58
-    private static final int QOS = 1;
59
-    // 保留原有订阅主题
60
-    private static final String SUBSCRIBE_TOPIC = "+/generics";
61
-    private static final int CONNECT_TIMEOUT = 3000;
62
-    private static final int RECONNECT_INTERVAL = 5000;
63
-
64
-    private String mqttUsername;
65
-    private String mqttPassword;
26
+    private final ObjectMapper objectMapper = new ObjectMapper();
66 27
 
67
-    // 移除:SSL相关的CA_CERT_PATH常量(无需证书配置)
68
-    // 你的其他成员变量保持不变
69
-    @Autowired
70
-    public TDengineService tDengineMapceshi2;
71
-    private MqttClient mqttClient;
72
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
73
-    private boolean isMqttConnected = false;
74
-    private MqttConnectOptions connOpts;
75
-    private ObjectMapper objectMapper = new ObjectMapper();
76
-
77
-
78
-    // 移除:未定义的threadPool引用(避免编译错误)
79
-    @PostConstruct
80
-    public void connectAndSubscribe() {
81
-        this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
82
-        String brokerAddr = this.brokerUrl.replace("tcp://", "");
83
-        int colonIdx = brokerAddr.lastIndexOf(':');
84
-        this.brokerHost = brokerAddr.substring(0, colonIdx);
85
-        this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
86
-        this.mqttUsername = iotProperties.getMqtt().getUsername();
87
-        this.mqttPassword = iotProperties.getMqtt().getPassword();
88
-
89
-        try {
90
-            // 1. 检测普通MQTT服务器连通性(移除SSL,仅检测TCP端口)
91
-            checkServerAvailability();
92
-            // 2. 创建MQTT客户端(使用普通TCP的brokerUrl,保持逻辑不变)
93
-            String clientId = generateClientIdByOs();
94
-            mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
95
-            // 3. 初始化MQTT连接选项(移除SSL配置,新增账号密码)
96
-            initMqttConnectOptions();
97
-            // 4. 设置MQTT回调(保持核心逻辑,修复消息解析bug)
98
-            setMqttCallback();
99
-            // 5. 建立连接并订阅主题(保持逻辑不变,移除SSL日志标识)
100
-            connectAndSubscribeTopic();
101
-        } catch (MqttException | InterruptedException e) {
102
-            log.error("MQTT客户端初始化失败:", e);
103
-        }
28
+    @Override
29
+    protected String getSubscribeTopic() {
30
+        return "+/generics";
104 31
     }
105 32
 
106
-    /**
107
-     * 检测普通MQTT服务器连通性(移除SSL,仅验证TCP端口可达性)
108
-     */
109
-    private void checkServerAvailability() throws InterruptedException {
110
-        boolean serverAvailable = false;
111
-        while (!serverAvailable) {
112
-            try (Socket socket = new Socket()) {
113
-                // 改用普通brokerHost和brokerPort
114
-                socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
115
-                serverAvailable = true;
116
-                log.info("普通MQTT服务器连通性检测通过");
117
-            } catch (Exception e) {
118
-                log.error("普通MQTT服务器不可达,5秒后重试...");
119
-                Thread.sleep(RECONNECT_INTERVAL);
120
-            }
121
-        }
122
-    }
123
-
124
-    /**
125
-     * 初始化MQTT连接选项(移除SSL配置,新增账号密码认证)
126
-     */
127
-    private void initMqttConnectOptions() {
128
-        connOpts = new MqttConnectOptions();
129
-        connOpts.setCleanSession(true);
130
-        connOpts.setAutomaticReconnect(true);
131
-        connOpts.setConnectionTimeout(10);
132
-
133
-        // ========== 新增:配置MQTT账号密码 ==========
134
-        connOpts.setUserName(mqttUsername);
135
-        connOpts.setPassword(mqttPassword.toCharArray()); // 密码要求传入char数组
136
-
137
-        // 移除:原有的SSL配置方法调用(configureSslAndStrictHostnameVerify())
138
-    }
139
-
140
-    /**
141
-     * 按操作系统生成唯一ClientId(保持原有逻辑不变)
142
-     */
143
-    private String generateClientIdByOs() {
33
+    @Override
34
+    protected String generateClientId() {
144 35
         String osName = System.getProperty("os.name").toLowerCase();
145 36
         return osName.contains("windows") ? "mqttx_e216fbf1615" : "mqttx_e216fbf1616";
146 37
     }
147 38
 
148
-    /**
149
-     * 设置MQTT回调函数(保留核心逻辑,修复2个关键bug)
150
-     */
151
-    private void setMqttCallback() {
152
-        mqttClient.setCallback(new MqttCallback() {
153
-            @Override
154
-            public void connectionLost(Throwable cause) {
155
-                log.error("MQTT连接断开,开始重连:" + cause.getMessage());
156
-                isMqttConnected = false;
157
-                reconnect();
158
-            }
159
-
160
-            @Override
161
-            public void messageArrived(String topic, MqttMessage message) throws Exception {
162
-                if (isMqttConnected) {
163
-                    executorService.submit(() -> {
164
-                        try {
165
-                            ObjectMapper objectMapper = new ObjectMapper();
166
-                            // 修复bug1:原代码先读取正确的messageContent,却误用mqtt(message.toString())解析
167
-                            // 正确读取消息负载(UTF-8编码,避免乱码)
168
-                            String messageContent = new String(message.getPayload(), "UTF-8");
169
-                            // 修复bug2:用正确的messageContent解析为ControllerData对象
170
-                            ControllerData controllerData = objectMapper.readValue(messageContent, ControllerData.class);
171
-                            // 业务处理
172
-                            triggermethod(controllerData);
173
-                        } catch (Exception e) {
174
-                            log.error("消息处理失败:", e);
175
-                        }
176
-                    });
177
-                }
178
-            }
179
-
180
-            @Override
181
-            public void deliveryComplete(IMqttDeliveryToken token) {
182
-                // 消息投递完成回调(无需处理)
183
-            }
184
-        });
185
-    }
186
-
187
-    /**
188
-     * 建立MQTT连接并订阅主题(保持逻辑不变,移除SSL日志标识)
189
-     */
190
-    private void connectAndSubscribeTopic() throws MqttException {
191
-        if (!mqttClient.isConnected()) {
192
-            IMqttToken connectToken = mqttClient.connectWithResult(connOpts);
193
-            if (connectToken.isComplete()) {
194
-                mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
195
-                isMqttConnected = true;
196
-                log.info("MQTT连接成功,已订阅主题:" + SUBSCRIBE_TOPIC);
197
-            }
198
-        }
199
-    }
200
-
201
-    /**
202
-     * MQTT重连逻辑(移除SSL日志标识,保持功能不变)
203
-     */
204
-    public void reconnect() {
205
-        int maxReconnectAttempts = 3;
206
-        for (int attempt = 1; attempt <= maxReconnectAttempts; attempt++) {
207
-            try {
208
-                Thread.sleep(RECONNECT_INTERVAL);
209
-                if (mqttClient != null && !mqttClient.isConnected()) {
210
-                    mqttClient.connect(connOpts);
211
-                    mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
212
-                    isMqttConnected = true;
213
-                    log.info("MQTT重连成功(第" + attempt + "次尝试)");
214
-                    break; // 重连成功后退出循环
215
-                }
216
-            } catch (MqttException | InterruptedException e) {
217
-                log.error("MQTT重连失败(第" + attempt + "次尝试):" + e.getMessage());
218
-                if (attempt == maxReconnectAttempts) {
219
-                    log.error("已达最大重连次数,停止重连");
220
-                }
221
-            }
222
-        }
223
-    }
224
-
225
-    /**
226
-     * 销毁时断开MQTT连接,关闭线程池(移除未定义的threadPool.shutdown())
227
-     */
228
-    @PreDestroy
229
-    public void disconnect() {
230
-        try {
231
-            if (mqttClient != null && mqttClient.isConnected()) {
232
-                mqttClient.disconnect();
233
-                mqttClient.close();
234
-                log.info("MQTT连接已断开");
235
-            }
236
-            executorService.shutdown();
237
-            // 移除:原代码中未定义的threadPool.shutdown();(避免编译错误)
238
-        } catch (MqttException e) {
239
-            log.error("MQTT断开连接失败:", e);
240
-        }
39
+    @Override
40
+    protected void handleMessage(String topic, String messageContent) throws Exception {
41
+        ControllerData controllerData = objectMapper.readValue(messageContent, ControllerData.class);
42
+        triggermethod(controllerData);
241 43
     }
242 44
 
243 45
     public void triggermethod(ControllerData weather) throws Exception {
@@ -247,40 +49,38 @@ public class MqttGenericConsumer {
247 49
         List<topics> topics = weather.getTopics();
248 50
         List<topics> cmdtopics = weather.getCmd_topics();
249 51
         topics faultprot = weather.getFault_prot();
250
-        //需要检索全部的数据是否存在,如果存储就进行修改,如果不存在就进行添加
251
-        Integer controllercountcount=0;
52
+
53
+        Integer controllercountcount = 0;
252 54
         for (topics topicsMap : topics) {
253 55
             Integer count = sysControllerService.selectsyscontrollercount(topicsMap.getPath());
254 56
             if (count <= 0) {
255 57
                 stringRedisTemplate.persist(controllerId);
256
-                stringRedisTemplate.opsForHash().put(controllerId+":"+topicsMap.getName(), "path", topicsMap.getPath());
257
-                //将数据存储到redis
58
+                stringRedisTemplate.opsForHash().put(controllerId + ":" + topicsMap.getName(), "path", topicsMap.getPath());
258 59
                 sysControllerService.insertsyscontroller(controllerId, timestamp, fleetId, topicsMap.getName(), topicsMap.getPath());
259 60
                 controllercountcount++;
260 61
             }
261 62
         }
262
-        Integer controllercountcmdcount=0;
63
+
64
+        Integer controllercountcmdcount = 0;
263 65
         for (topics cmdtopicsMap : cmdtopics) {
264
-            //将数据存储到redis
265 66
             Integer count = sysControllerService.selectsyscontrollercountcmd(cmdtopicsMap.getPath());
266
-            if (count<=0) {
267
-                stringRedisTemplate.opsForHash().put(controllerId+"_cmd:"+cmdtopicsMap.getName(), "path", cmdtopicsMap.getPath());
67
+            if (count <= 0) {
68
+                stringRedisTemplate.opsForHash().put(controllerId + "_cmd:" + cmdtopicsMap.getName(), "path", cmdtopicsMap.getPath());
268 69
                 stringRedisTemplate.persist(controllerId);
269 70
                 sysControllerService.insertsyscontrollercmd(controllerId, timestamp, fleetId, cmdtopicsMap.getName(), cmdtopicsMap.getPath());
270 71
                 controllercountcmdcount++;
271 72
             }
272 73
         }
273
-        Integer count =sysControllerService.selectsyscontrollercountfault(faultprot.getPath());
274
-        if (count<=0){
275
-            stringRedisTemplate.opsForHash().put(controllerId+"_fault:"+faultprot.getName(), "path", faultprot.getPath());
74
+
75
+        Integer count = sysControllerService.selectsyscontrollercountfault(faultprot.getPath());
76
+        if (count <= 0) {
77
+            stringRedisTemplate.opsForHash().put(controllerId + "_fault:" + faultprot.getName(), "path", faultprot.getPath());
276 78
             sysControllerService.insertsyscontrollerfault(controllerId, timestamp, fleetId, faultprot.getName(), faultprot.getPath());
277 79
             stringRedisTemplate.persist(controllerId);
278 80
         }
279
-        //如果控制器中的数据出现更新情况,就触发该方法进行重连
280
-        if (controllercountcount>0) {
281
-            //关闭连接线程
81
+
82
+        if (controllercountcount > 0) {
282 83
             messageListenerService2.destroy();
283
-            //重新建立连接
284 84
             messageListenerService2.initMqttConnection();
285 85
         }
286 86
     }

+ 25
- 229
iot-platform/src/main/java/com/iot/platform/mqtt/MqttStatusConsumer.java Переглянути файл

@@ -1,269 +1,65 @@
1 1
 package com.iot.platform.mqtt;
2
+
2 3
 import com.fasterxml.jackson.databind.ObjectMapper;
3 4
 import com.iot.platform.service.SysControllerService;
4 5
 import com.iot.platform.service.SysStatusService;
5
-import com.iot.platform.service.TDengineService;
6
-import org.eclipse.paho.client.mqttv3.*;
7
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
8
-import com.iot.platform.config.IotProperties;
9 6
 import org.springframework.beans.factory.annotation.Autowired;
10
-import org.springframework.core.io.ClassPathResource;
11 7
 import org.springframework.data.redis.core.StringRedisTemplate;
12
-import org.springframework.scheduling.annotation.Async;
13 8
 import org.springframework.stereotype.Component;
14 9
 
15
-import javax.annotation.PostConstruct;
16
-import javax.annotation.PreDestroy;
17
-import javax.net.ssl.*;
18
-import java.io.IOException;
19
-import java.io.InputStream;
20
-import java.net.InetSocketAddress;
21
-import java.net.Socket;
22
-import java.security.KeyStore;
23
-import java.security.cert.CertificateFactory;
24
-import java.security.cert.X509Certificate;
25 10
 import java.time.LocalDateTime;
26 11
 import java.time.format.DateTimeFormatter;
27
-import java.util.Collection;
28
-import java.util.List;
29 12
 import java.util.Map;
30
-import java.util.concurrent.ExecutorService;
31
-import java.util.concurrent.Executors;
32
-import org.slf4j.Logger;
33
-import org.slf4j.LoggerFactory;
34 13
 
35 14
 /**
36 15
  * 存储控制器状态数据
37 16
  */
38 17
 @Component
39
-public class MqttStatusConsumer {
40
-
41
-    private static final Logger log = LoggerFactory.getLogger(MqttStatusConsumer.class);
18
+public class MqttStatusConsumer extends AbstractMqttConsumer {
42 19
 
43
-    private static ExecutorService threadPool= Executors.newCachedThreadPool();
44 20
     @Autowired
45 21
     private StringRedisTemplate stringRedisTemplate;
46 22
     @Autowired
47 23
     public SysControllerService sysControllerService;
48 24
     @Autowired
49
-    public MqttDynamicConsumer messageListenerService2;
50
-    @Autowired
51 25
     public SysStatusService sysStatusService;
52 26
 
53
-    @Autowired
54
-    private IotProperties iotProperties;
55
-
56
-    // 移除SSL,改用普通TCP连接(对应mqtt://,端口默认1883,可根据你的实际服务端修改)
57
-    private String brokerUrl;
58
-    private String brokerHost;
59
-    private int brokerPort;
60
-    private static final int QOS = 1;
61
-    // 主题
62
-    private static final String SUBSCRIBE_TOPIC = "+/status";
63
-    private static final int CONNECT_TIMEOUT = 3000;
64
-    // 重试时间
65
-    private static final int RECONNECT_INTERVAL = 5000;
66
-
67
-    private String mqttUsername;
68
-    private String mqttPassword;
69
-
70
-    // 你的其他成员变量保持不变
71
-    @Autowired
72
-    public TDengineService tDengineMapceshi2;
73
-    private MqttClient mqttClient;
74
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
75
-    private boolean isMqttConnected = false;
76
-    private MqttConnectOptions connOpts;
77
-    // 注意:移除了threadPool的定义(原代码@PreDestroy中存在,若未定义需补充或删除,此处暂删除多余引用)
78
-
79
-    @PostConstruct
80
-    public void connectAndSubscribe() {
81
-        this.brokerUrl = iotProperties.getMqtt().getBrokerUrl();
82
-        String brokerAddr = this.brokerUrl.replace("tcp://", "");
83
-        int colonIdx = brokerAddr.lastIndexOf(':');
84
-        this.brokerHost = brokerAddr.substring(0, colonIdx);
85
-        this.brokerPort = Integer.parseInt(brokerAddr.substring(colonIdx + 1));
86
-        this.mqttUsername = iotProperties.getMqtt().getUsername();
87
-        this.mqttPassword = iotProperties.getMqtt().getPassword();
27
+    private final ObjectMapper objectMapper = new ObjectMapper();
88 28
 
89
-        try {
90
-            // 1. 检测普通MQTT服务器连通性(移除SSL相关,保留基础连通性检测)
91
-            checkServerAvailability();
92
-
93
-            // 2. 创建MQTT客户端(直接使用普通TCP的brokerUrl)
94
-            String clientId = generateClientIdByOs();
95
-            mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
96
-
97
-            // 3. 初始化MQTT连接选项(已移除SSL配置,新增账号密码)
98
-            initMqttConnectOptions();
99
-
100
-            // 4. 设置MQTT回调(保持不变,仅修复消息处理中的小bug)
101
-            setMqttCallback();
102
-
103
-            // 5. 建立连接并订阅主题(保持不变)
104
-            connectAndSubscribeTopic();
105
-
106
-        } catch (MqttException | InterruptedException e) {
107
-            log.error("MQTT客户端初始化失败:", e);
108
-        }
109
-    }
110
-
111
-    /**
112
-     * 检测普通MQTT服务器连通性(移除SSL相关,仅检测TCP端口)
113
-     */
114
-    private void checkServerAvailability() throws InterruptedException {
115
-        boolean serverAvailable = false;
116
-        while (!serverAvailable) {
117
-            try (Socket socket = new Socket()) {
118
-                socket.connect(new InetSocketAddress(brokerHost, brokerPort), CONNECT_TIMEOUT);
119
-                serverAvailable = true;
120
-                log.info("普通MQTT服务器连通性检测通过");
121
-            } catch (Exception e) {
122
-                log.error("普通MQTT服务器不可达,5秒后重试...");
123
-                Thread.sleep(RECONNECT_INTERVAL);
124
-            }
125
-        }
29
+    @Override
30
+    protected String getSubscribeTopic() {
31
+        return "+/status";
126 32
     }
127 33
 
128
-    /**
129
-     * 初始化MQTT连接选项(移除SSL配置,新增账号密码认证)
130
-     */
131
-    private void initMqttConnectOptions() {
132
-        connOpts = new MqttConnectOptions();
133
-        connOpts.setCleanSession(true);
134
-        connOpts.setAutomaticReconnect(true);
135
-        connOpts.setConnectionTimeout(10);
136
-
137
-        // ========== 新增:配置MQTT账号密码 ==========
138
-        connOpts.setUserName(mqttUsername);
139
-        connOpts.setPassword(mqttPassword.toCharArray()); // 密码要求传入char数组
140
-
141
-        // 移除:原有的SSL配置方法调用(configureSslAndStrictHostnameVerify())
142
-    }
143
-
144
-    /**
145
-     * 按操作系统生成唯一ClientId(保持不变)
146
-     */
147
-    private String generateClientIdByOs() {
34
+    @Override
35
+    protected String generateClientId() {
148 36
         String osName = System.getProperty("os.name").toLowerCase();
149 37
         return osName.contains("windows") ? "mqttx_e216fbf1613" : "mqttx_e216fbf1614";
150 38
     }
151 39
 
152
-    /**
153
-     * 设置MQTT回调函数(保持不变,修复消息处理中的bug:误用mqtt变量,应使用messageContent)
154
-     */
155
-    private void setMqttCallback() {
156
-        mqttClient.setCallback(new MqttCallback() {
157
-            @Override
158
-            public void connectionLost(Throwable cause) {
159
-                log.error("MQTT连接断开,开始重连:" + cause.getMessage());
160
-                isMqttConnected = false;
161
-                reconnect();
162
-            }
163
-
164
-            @Override
165
-            public void messageArrived(String topic, MqttMessage message) throws Exception {
166
-                if (isMqttConnected) {
167
-                    executorService.submit(() -> {
168
-                        try {
169
-                            // 修复bug:原代码先定义messageContent,后误用mqtt变量(未赋值,实际是message.toString())
170
-                            ObjectMapper objectMapper = new ObjectMapper();
171
-                            // 正确读取消息负载(UTF-8编码)
172
-                            String messageContent = new String(message.getPayload(), "UTF-8");
173
-                            // 改用messageContent解析,避免空指针或解析错误
174
-                            Map<String, Object> listOfMaps = objectMapper.readValue(messageContent, Map.class);
175
-                            // 业务处理
176
-                            triggermethod(listOfMaps);
177
-                        } catch (Exception e) {
178
-                            log.error("消息处理失败:", e);
179
-                        }
180
-                    });
181
-                }
182
-            }
183
-
184
-            @Override
185
-            public void deliveryComplete(IMqttDeliveryToken token) {
186
-                // 消息投递完成回调(无需处理)
187
-            }
188
-        });
40
+    @Override
41
+    protected void handleMessage(String topic, String messageContent) throws Exception {
42
+        Map<String, Object> weather = objectMapper.readValue(messageContent, Map.class);
43
+        triggermethod(weather);
189 44
     }
190 45
 
191
-    /**
192
-     * 建立MQTT连接并订阅主题(保持不变)
193
-     */
194
-    private void connectAndSubscribeTopic() throws MqttException {
195
-        if (!mqttClient.isConnected()) {
196
-            IMqttToken connectToken = mqttClient.connectWithResult(connOpts);
197
-            if (connectToken.isComplete()) {
198
-                mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
199
-                isMqttConnected = true;
200
-                log.info("MQTT连接成功,已订阅主题:" + SUBSCRIBE_TOPIC);
201
-            }
202
-        }
203
-    }
46
+    public void triggermethod(Map<String, Object> weather) throws Exception {
47
+        String controllerId = weather.get("controller_id").toString();
48
+        String fleetId = weather.get("fleet_id").toString();
49
+        String status = weather.get("status").toString();
204 50
 
205
-    /**
206
-     * MQTT重连逻辑(保持不变)
207
-     */
208
-    public void reconnect() {
209
-        int maxReconnectAttempts = 3;
210
-        for (int attempt = 1; attempt <= maxReconnectAttempts; attempt++) {
211
-            try {
212
-                Thread.sleep(RECONNECT_INTERVAL);
213
-                if (mqttClient != null && !mqttClient.isConnected()) {
214
-                    mqttClient.connect(connOpts);
215
-                    mqttClient.subscribe(SUBSCRIBE_TOPIC, QOS);
216
-                    isMqttConnected = true;
217
-                    log.info("MQTT重连成功(第" + attempt + "次尝试)");
218
-                    break; // 重连成功后退出循环
219
-                }
220
-            } catch (MqttException | InterruptedException e) {
221
-                log.error("MQTT重连失败(第" + attempt + "次尝试):" + e.getMessage());
222
-                if (attempt == maxReconnectAttempts) {
223
-                    log.error("已达最大重连次数,停止重连");
224
-                }
225
-            }
226
-        }
227
-    }
228
-
229
-    /**
230
-     * 销毁时断开MQTT连接,关闭线程池(移除threadPool多余引用)
231
-     */
232
-    @PreDestroy
233
-    public void disconnect() {
234
-        try {
235
-            if (mqttClient != null && mqttClient.isConnected()) {
236
-                mqttClient.disconnect();
237
-                mqttClient.close();
238
-                log.info("MQTT连接已断开");
239
-            }
240
-            executorService.shutdown();
241
-            // 移除:原代码中的threadPool.shutdown();(若未定义该变量,直接删除,避免编译错误)
242
-        } catch (MqttException e) {
243
-            log.error("MQTT断开连接失败:", e);
244
-        }
245
-    }
246
-
247
-//    @Async
248
-    public void triggermethod(Map<String,Object> weather) throws Exception {
249
-        String controllerId=weather.get("controller_id").toString();
250
-        String fleetId=weather.get("fleet_id").toString();
251
-        String status=weather.get("status").toString();
252
-        //当前时间
253 51
         LocalDateTime currentTime = LocalDateTime.now();
254 52
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
255 53
         String currentTimeStr = currentTime.format(formatter);
256
-        //添加数据到超级表中
257
-        stringRedisTemplate.opsForHash().put(controllerId + "status:", "fleet_id",weather.get("fleet_id").toString());
258
-        stringRedisTemplate.opsForHash().put(controllerId + "status:", "status",weather.get("status").toString());
259
-        //查询数据库中是否存在该条数据
260
-        Integer count=sysStatusService.selectstatuscount(controllerId);
261
-        if (count<=0){
262
-            //添加数据到mysql数据库中
263
-            sysStatusService.insertsysstatus(controllerId,fleetId,status,currentTimeStr);
264
-        }else{
265
-            sysStatusService.updatestatus(controllerId,fleetId,status,currentTimeStr);
54
+
55
+        stringRedisTemplate.opsForHash().put(controllerId + "status:", "fleet_id", fleetId);
56
+        stringRedisTemplate.opsForHash().put(controllerId + "status:", "status", status);
57
+
58
+        Integer count = sysStatusService.selectstatuscount(controllerId);
59
+        if (count <= 0) {
60
+            sysStatusService.insertsysstatus(controllerId, fleetId, status, currentTimeStr);
61
+        } else {
62
+            sysStatusService.updatestatus(controllerId, fleetId, status, currentTimeStr);
266 63
         }
267 64
     }
268
-
269 65
 }

+ 32
- 39
iot-platform/src/main/java/com/iot/platform/service/SysrealtimeService.java Переглянути файл

@@ -1,71 +1,64 @@
1 1
 package com.iot.platform.service;
2 2
 
3 3
 import com.iot.platform.mapper.SysrealtimeMapper;
4
-import org.apache.ibatis.annotations.Param;
4
+import org.apache.commons.lang3.StringUtils;
5 5
 import org.slf4j.Logger;
6 6
 import org.slf4j.LoggerFactory;
7 7
 import org.springframework.stereotype.Service;
8 8
 
9 9
 import javax.annotation.Resource;
10
-import java.sql.Connection;
11
-import java.sql.DriverManager;
12
-import java.sql.SQLException;
13 10
 import java.util.List;
11
+import java.util.regex.Pattern;
14 12
 
15 13
 @Service
16 14
 public class SysrealtimeService {
17 15
     private static final Logger log = LoggerFactory.getLogger(SysrealtimeService.class);
16
+    private static final Pattern TABLE_NAME_PATTERN = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");
17
+    private static final int MAX_TABLE_NAME_LENGTH = 64;
18 18
 
19 19
     @Resource
20 20
     public SysrealtimeMapper sysrealtimeMapper;
21 21
 
22 22
     public void createrealtime(String tableName) {
23
+        validateTableName(tableName);
23 24
         log.info("准备创建表: [{}]", tableName);
24
-        log.info("执行建表SQL前,先验证数据库连接...");
25
-
26
-        // 直接用JDBC测试连接和建表
27
-        try (Connection conn = getConnection()) {
28
-            String sql = "CREATE TABLE IF NOT EXISTS `" + tableName + "` (" +
29
-                    "create_time VARCHAR(255) NOT NULL COMMENT '时间戳'," +
30
-                    "device_id VARCHAR(255) NOT NULL COMMENT '设备id'," +
31
-                    "timestamp VARCHAR(255) NOT NULL COMMENT '时间戳'," +
32
-                    "k VARCHAR(255) NOT NULL COMMENT 'key'," +
33
-                    "v VARCHAR(255) NOT NULL COMMENT '值'," +
34
-                    "INDEX idx_device_id (device_id)," +
35
-                    "INDEX idx_device_create_time (device_id, create_time)," +
36
-                    "INDEX idx_k (k)" +
37
-                    ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='实时数据信息'";
38
-            log.info("SQL: {}", sql);
39
-            conn.createStatement().executeUpdate(sql);
40
-            log.info("表[{}]创建成功", tableName);
41
-        } catch (Exception e) {
42
-            log.error("JDBC建表[{}]失败: {}", tableName, e.getMessage());
43
-            Throwable cause = e;
44
-            while (cause.getCause() != null) cause = cause.getCause();
45
-            log.error("根因: {}", cause.getMessage());
46
-            throw new RuntimeException("建表失败", e);
47
-        }
48
-    }
49
-
50
-    private Connection getConnection() throws SQLException {
51
-        String url = "jdbc:mysql://47.104.204.180:3306/data?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8";
52
-        String user = "root";
53
-        String password = "Zhu059300()__";
54
-        return DriverManager.getConnection(url, user, password);
25
+        sysrealtimeMapper.createrealtime(tableName);
26
+        log.info("表[{}]创建成功", tableName);
55 27
     }
56 28
 
57
-    public List<String> selecttables(){
29
+    public List<String> selecttables() {
58 30
         return sysrealtimeMapper.selecttables();
59 31
     }
60
-    public void inserttables(String tableName,String createTime,String deviceId,String timestamp,String k,Object v){
32
+
33
+    public void inserttables(String tableName, String createTime, String deviceId, String timestamp, String k, Object v) {
34
+        validateTableName(tableName);
61 35
         sysrealtimeMapper.inserttables(tableName, createTime, deviceId, timestamp, k, v);
62 36
     }
63
-    public void updatetables(String tableName,String createTime,Object v,String timestamp,String k,String deviceId){
37
+
38
+    public void updatetables(String tableName, String createTime, Object v, String timestamp, String k, String deviceId) {
39
+        validateTableName(tableName);
64 40
         sysrealtimeMapper.updatetables(tableName, createTime, v, timestamp, k, deviceId);
65 41
     }
66 42
 
67
-    public Integer selectkey(String tableName,String k){
43
+    public Integer selectkey(String tableName, String k) {
44
+        validateTableName(tableName);
68 45
         return sysrealtimeMapper.selectkey(tableName, k);
69 46
     }
70 47
 
48
+    public List<String> selectAllKeys(String tableName) {
49
+        validateTableName(tableName);
50
+        return sysrealtimeMapper.selectAllKeys(tableName);
51
+    }
52
+
53
+    private void validateTableName(String tableName) {
54
+        if (StringUtils.isBlank(tableName)) {
55
+            throw new IllegalArgumentException("表名不能为空");
56
+        }
57
+        if (tableName.length() > MAX_TABLE_NAME_LENGTH) {
58
+            throw new IllegalArgumentException("表名过长,最大" + MAX_TABLE_NAME_LENGTH + "字符");
59
+        }
60
+        if (!TABLE_NAME_PATTERN.matcher(tableName).matches()) {
61
+            throw new IllegalArgumentException("非法表名: " + tableName);
62
+        }
63
+    }
71 64
 }

+ 63
- 31
iot-platform/src/main/java/com/iot/platform/task/VehicleSyncTask.java Переглянути файл

@@ -8,7 +8,10 @@ import com.iot.platform.service.*;
8 8
 import org.slf4j.Logger;
9 9
 import org.slf4j.LoggerFactory;
10 10
 import org.springframework.beans.factory.annotation.Autowired;
11
+import org.springframework.data.redis.core.RedisCallback;
12
+import org.springframework.data.redis.core.ScanOptions;
11 13
 import org.springframework.data.redis.core.StringRedisTemplate;
14
+import org.springframework.http.client.SimpleClientHttpRequestFactory;
12 15
 import org.springframework.scheduling.annotation.Scheduled;
13 16
 import org.springframework.stereotype.Component;
14 17
 import org.springframework.web.client.RestTemplate;
@@ -29,8 +32,6 @@ public class VehicleSyncTask {
29 32
     @Autowired
30 33
     private StringRedisTemplate stringRedisTemplate;
31 34
     @Autowired
32
-    private RestTemplate restTemplate;
33
-    @Autowired
34 35
     private SysrealtimeService sysrealtimeService;
35 36
     @Autowired
36 37
     public SysDeviceVoService sysDeviceVoService;
@@ -43,12 +44,21 @@ public class VehicleSyncTask {
43 44
     @Autowired
44 45
     public SysCompanyService sysCompanyService;
45 46
 
47
+    private final RestTemplate restTemplate;
48
+
49
+    public VehicleSyncTask() {
50
+        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
51
+        factory.setConnectTimeout(5000);
52
+        factory.setReadTimeout(10000);
53
+        this.restTemplate = new RestTemplate(factory);
54
+    }
55
+
46 56
     /**
47 57
      * 更新车辆的控制器信息
48 58
      * 30秒更新一次
49 59
      */
50 60
     @Scheduled(fixedDelay = 30000)
51
-    public void updatesyscar() {
61
+    public void updateSysCar() {
52 62
         List<SysCar> sysCarList = sysCarService.selectcontrollerId();
53 63
         for (SysCar sysCar : sysCarList) {
54 64
             if (sysCar.getControllerId() == null || sysCar.getControllerId().isEmpty()) {
@@ -58,39 +68,58 @@ public class VehicleSyncTask {
58 68
             SysDevice longitude = sysDeviceService.selectsysdevice(sysCar.getControllerId(), "经度");
59 69
 
60 70
             String redisKeyPattern = "workorder:coordinate:" + sysCar.getControllerId() + ":*";
61
-            Set<String> keys = stringRedisTemplate.keys(redisKeyPattern);
71
+            Set<String> keys = scanKeys(redisKeyPattern);
62 72
 
63 73
             if (keys == null || keys.isEmpty()) {
64
-                String redisKey = "workorder:coordinate:" + sysCar.getControllerId();
65
-                stringRedisTemplate.opsForHash().put(redisKey, "latitude", latitude.getV());
66
-                stringRedisTemplate.opsForHash().put(redisKey, "longitude", longitude.getV());
67
-                stringRedisTemplate.persist(redisKey);
68
-
69
-                String position = latitude.getV() + "," + longitude.getV();
70
-                sysCarService.updatecarposition(position, sysCar.getCarId());
71
-                String url = "https://esos-iot.com:9443/syscar/trigger?carId=" + sysCar.getCarId();
72
-                restTemplate.postForObject(url, null, String.class);
74
+                updateCarPosition(sysCar, latitude, longitude);
73 75
             } else {
74 76
                 for (String key : keys) {
75 77
                     Map<Object, Object> coordinateMap = stringRedisTemplate.opsForHash().entries(key);
76 78
                     if (coordinateMap.get("latitude").equals(latitude.getV()) && coordinateMap.get("longitude").equals(longitude.getV())) {
77 79
                         continue;
78 80
                     }
79
-                    String redisKey = "workorder:coordinate:" + sysCar.getControllerId();
80
-                    stringRedisTemplate.opsForHash().put(redisKey, "latitude", latitude.getV());
81
-                    stringRedisTemplate.opsForHash().put(redisKey, "longitude", longitude.getV());
82
-                    stringRedisTemplate.persist(redisKey);
83
-                    String position = latitude.getV() + "," + longitude.getV();
84
-                    sysCarService.updatecarposition(position, sysCar.getCarId());
85
-                    String url = "https://esos-iot.com:9443/syscar/trigger?carId=" + sysCar.getCarId();
86
-                    restTemplate.postForObject(url, null, String.class);
81
+                    updateCarPosition(sysCar, latitude, longitude);
87 82
                 }
88 83
             }
89 84
         }
90 85
     }
91 86
 
87
+    private void updateCarPosition(SysCar sysCar, SysDevice latitude, SysDevice longitude) {
88
+        String redisKey = "workorder:coordinate:" + sysCar.getControllerId();
89
+        stringRedisTemplate.opsForHash().put(redisKey, "latitude", latitude.getV());
90
+        stringRedisTemplate.opsForHash().put(redisKey, "longitude", longitude.getV());
91
+        stringRedisTemplate.persist(redisKey);
92
+
93
+        String position = latitude.getV() + "," + longitude.getV();
94
+        sysCarService.updatecarposition(position, sysCar.getCarId());
95
+        String url = "https://esos-iot.com:9443/syscar/trigger?carId=" + sysCar.getCarId();
96
+        try {
97
+            restTemplate.postForObject(url, null, String.class);
98
+        } catch (Exception e) {
99
+            log.warn("触发webhook失败 carId={}: {}", sysCar.getCarId(), e.getMessage());
100
+        }
101
+    }
102
+
103
+    private Set<String> scanKeys(String pattern) {
104
+        Set<String> keys = new HashSet<>();
105
+        ScanOptions options = ScanOptions.scanOptions().match(pattern).count(100).build();
106
+        try {
107
+            stringRedisTemplate.execute((RedisCallback<Void>) connection -> {
108
+                org.springframework.data.redis.core.Cursor<byte[]> cursor = connection.scan(options);
109
+                while (cursor.hasNext()) {
110
+                    keys.add(new String(cursor.next()));
111
+                }
112
+                cursor.close();
113
+                return null;
114
+            });
115
+        } catch (Exception e) {
116
+            log.error("Redis SCAN失败 pattern={}: {}", pattern, e.getMessage());
117
+        }
118
+        return keys;
119
+    }
120
+
92 121
     @Scheduled(fixedDelay = 30000)
93
-    public void insertdevice() {
122
+    public void insertDevice() {
94 123
         Set<String> activeKeys;
95 124
         try {
96 125
             activeKeys = stringRedisTemplate.opsForSet().members("DSB:active:devices");
@@ -103,6 +132,7 @@ public class VehicleSyncTask {
103 132
                 log.info("redis中无数据");
104 133
                 return;
105 134
             }
135
+            List<SysDeviceControl> sysDeviceControlList = sysDeviceControlService.selectdevice("device");
106 136
             for (String redisKey : activeKeys) {
107 137
                 Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
108 138
                 if (dataMap == null || dataMap.isEmpty()) {
@@ -115,7 +145,6 @@ public class VehicleSyncTask {
115 145
                     continue;
116 146
                 }
117 147
                 String controllerId = parts[1];
118
-                List<SysDeviceControl> sysDeviceControlList = sysDeviceControlService.selectdevice("device");
119 148
                 Integer count = sysDeviceVoService.selectcount(controllerId);
120 149
                 if (count != null && count > 0) {
121 150
                     StringBuilder keyvalue = new StringBuilder();
@@ -164,6 +193,7 @@ public class VehicleSyncTask {
164 193
     public void syncRedisToMySQL() {
165 194
         Set<String> activeKeys = stringRedisTemplate.opsForSet().members("DSB:active:devices");
166 195
         if (activeKeys == null || activeKeys.isEmpty()) return;
196
+
167 197
         for (String redisKey : activeKeys) {
168 198
             try {
169 199
                 Map<Object, Object> dataMap = stringRedisTemplate.opsForHash().entries(redisKey);
@@ -178,15 +208,14 @@ public class VehicleSyncTask {
178 208
                     continue;
179 209
                 }
180 210
                 String controllerId = parts[1];
211
+
181 212
                 try {
182 213
                     sysrealtimeService.createrealtime(controllerId);
183 214
                 } catch (Exception e) {
184
-                    Throwable cause = e;
185
-                    while (cause.getCause() != null) cause = cause.getCause();
186
-                    log.error("创建表失败: {} | {} | {}", controllerId, e.getMessage(), cause.getMessage());
187
-                    e.printStackTrace();
215
+                    log.error("创建表失败: {} | {}", controllerId, e.getMessage(), e);
188 216
                     continue;
189 217
                 }
218
+
190 219
                 String createTime = getStringValue(dataMap, "createTime");
191 220
                 String timestamp = getStringValue(dataMap, "timestamp");
192 221
                 String deviceId = getStringValue(dataMap, "device_id");
@@ -194,6 +223,9 @@ public class VehicleSyncTask {
194 223
                     continue;
195 224
                 }
196 225
 
226
+                List<String> existingKeys = sysrealtimeService.selectAllKeys(controllerId);
227
+                Set<String> existingKeySet = existingKeys != null ? new HashSet<>(existingKeys) : Collections.emptySet();
228
+
197 229
                 for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
198 230
                     String fieldKey = entry.getKey().toString();
199 231
                     if ("createTime".equals(fieldKey) || "timestamp".equals(fieldKey) || "device_id".equals(fieldKey)) {
@@ -201,15 +233,15 @@ public class VehicleSyncTask {
201 233
                     }
202 234
                     String fieldValue = getStringValue(dataMap, fieldKey);
203 235
                     if (fieldValue == null) continue;
204
-                    Integer count = sysrealtimeService.selectkey(controllerId, fieldKey);
205
-                    if (count != null && count > 0) {
236
+
237
+                    if (existingKeySet.contains(fieldKey)) {
206 238
                         sysrealtimeService.updatetables(controllerId, createTime, fieldValue, timestamp, fieldKey, deviceId);
207 239
                     } else {
208 240
                         sysrealtimeService.inserttables(controllerId, createTime, deviceId, timestamp, fieldKey, fieldValue);
209 241
                     }
210 242
                 }
211 243
             } catch (Exception e) {
212
-                log.error("同步设备失败: {} | {}", redisKey, e.getMessage());
244
+                log.error("同步设备失败: {} | {}", redisKey, e.getMessage(), e);
213 245
             }
214 246
         }
215 247
     }
@@ -224,7 +256,7 @@ public class VehicleSyncTask {
224 256
      * 根据公司去查询
225 257
      */
226 258
     @Scheduled(fixedDelay = 30000)
227
-    public void insertindicators() {
259
+    public void insertIndicators() {
228 260
         try {
229 261
             LocalDate today = LocalDate.now();
230 262
             DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");

+ 7
- 7
iot-platform/src/main/resources/application-druid.yml Переглянути файл

@@ -7,15 +7,15 @@ spring:
7 7
             # 主库数据源
8 8
             master:
9 9
                 url: jdbc:mysql://47.104.204.180:3306/data?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
10
-                username: root
11
-                password: Zhu059300()__
10
+                username: ${MYSQL_USERNAME:root}
11
+                password: ${MYSQL_PASSWORD:Zhu059300()__}
12 12
             # 从库数据源
13 13
             slave:
14 14
                 # 从数据源开关/默认关闭
15 15
                 enabled: true
16 16
                 url: jdbc:mysql://47.104.204.180:3306/cnc?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
17
-                username: root
18
-                password: Zhu059300()__
17
+                username: ${MYSQL_USERNAME:root}
18
+                password: ${MYSQL_PASSWORD:Zhu059300()__}
19 19
             # 初始连接数
20 20
             initialSize: 5
21 21
             # 最小连接池数量
@@ -42,13 +42,13 @@ spring:
42 42
             webStatFilter:
43 43
                 enabled: true
44 44
             statViewServlet:
45
-                enabled: true
45
+                enabled: ${DRUID_STAT_ENABLED:false}
46 46
                 # 设置白名单,不填则允许所有访问
47 47
                 allow:
48 48
                 url-pattern: /druid/*
49 49
                 # 控制台管理用户名和密码
50
-                login-username: ruoyi
51
-                login-password: 123456
50
+                login-username: ${DRUID_USERNAME:ruoyi}
51
+                login-password: ${DRUID_PASSWORD:123456}
52 52
             filter:
53 53
                 stat:
54 54
                     enabled: true

+ 15
- 5
iot-platform/src/main/resources/application.yml Переглянути файл

@@ -32,7 +32,7 @@ spring:
32 32
     host: localhost
33 33
     port: 6379
34 34
     database: 0
35
-    password:
35
+    password: ${REDIS_PASSWORD:}
36 36
     timeout: 10s
37 37
     lettuce:
38 38
       pool:
@@ -52,13 +52,23 @@ pagehelper:
52 52
   supportMethodsArguments: true
53 53
   params: count=countSql
54 54
 
55
+# Actuator 配置
56
+management:
57
+  endpoints:
58
+    web:
59
+      exposure:
60
+        include: health,info
61
+  endpoint:
62
+    health:
63
+      show-details: always
64
+
55 65
 # IoT平台配置
56 66
 iot:
57 67
   mqtt:
58 68
     broker-url: tcp://47.104.204.180:1883
59
-    username: NjniyrEO
60
-    password: 2b577892f4824d466dbc323a1ee4dfe1902c55bb
69
+    username: ${MQTT_USERNAME:NjniyrEO}
70
+    password: ${MQTT_PASSWORD:2b577892f4824d466dbc323a1ee4dfe1902c55bb}
61 71
   tdengine:
62 72
     url: jdbc:TAOS://localhost:6030/
63
-    username: root
64
-    password: taosdata
73
+    username: ${TDENGINE_USERNAME:root}
74
+    password: ${TDENGINE_PASSWORD:taosdata}

+ 4
- 0
iot-platform/src/main/resources/mapper/SysrealtimeMapper.xml Переглянути файл

@@ -37,4 +37,8 @@
37 37
     <select id="selectkey" resultType="Integer">
38 38
         select COUNT(*) from `${tableName}` where k=#{k}
39 39
     </select>
40
+
41
+    <select id="selectAllKeys" resultType="String">
42
+        SELECT k FROM `${tableName}`
43
+    </select>
40 44
 </mapper>

+ 57
- 0
iot-platform/src/test/java/com/iot/platform/common/RedisKeysTest.java Переглянути файл

@@ -0,0 +1,57 @@
1
+package com.iot.platform.common;
2
+
3
+import org.junit.jupiter.api.DisplayName;
4
+import org.junit.jupiter.api.Test;
5
+
6
+import static org.assertj.core.api.Assertions.assertThat;
7
+
8
+class RedisKeysTest {
9
+
10
+    @Test
11
+    @DisplayName("ACTIVE_DEVICES 应包含统一前缀")
12
+    void activeDevices_containsPrefix() {
13
+        assertThat(RedisKeys.ACTIVE_DEVICES).isEqualTo("iot:dsb:active:devices");
14
+    }
15
+
16
+    @Test
17
+    @DisplayName("deviceTelemetry 应生成正确的 key 格式")
18
+    void deviceTelemetry_formatsCorrectly() {
19
+        String key = RedisKeys.deviceTelemetry("ctrl001", "temperature");
20
+        assertThat(key).isEqualTo("iot:dsb:ctrl001:temperature");
21
+    }
22
+
23
+    @Test
24
+    @DisplayName("workorderCoordinate 应生成正确的 key 格式")
25
+    void workorderCoordinate_formatsCorrectly() {
26
+        String key = RedisKeys.workorderCoordinate("ctrl001");
27
+        assertThat(key).isEqualTo("iot:workorder:coordinate:ctrl001");
28
+    }
29
+
30
+    @Test
31
+    @DisplayName("controllerStatus 应生成正确的 key 格式")
32
+    void controllerStatus_formatsCorrectly() {
33
+        String key = RedisKeys.controllerStatus("ctrl001");
34
+        assertThat(key).isEqualTo("iot:controller:ctrl001:status");
35
+    }
36
+
37
+    @Test
38
+    @DisplayName("controllerTopic 应生成正确的 key 格式")
39
+    void controllerTopic_formatsCorrectly() {
40
+        String key = RedisKeys.controllerTopic("ctrl001", "metrics");
41
+        assertThat(key).isEqualTo("iot:controller:ctrl001:metrics");
42
+    }
43
+
44
+    @Test
45
+    @DisplayName("controllerCmdTopic 应生成正确的 key 格式")
46
+    void controllerCmdTopic_formatsCorrectly() {
47
+        String key = RedisKeys.controllerCmdTopic("ctrl001", "cmd1");
48
+        assertThat(key).isEqualTo("iot:controller:ctrl001_cmd:cmd1");
49
+    }
50
+
51
+    @Test
52
+    @DisplayName("controllerFaultTopic 应生成正确的 key 格式")
53
+    void controllerFaultTopic_formatsCorrectly() {
54
+        String key = RedisKeys.controllerFaultTopic("ctrl001", "fault1");
55
+        assertThat(key).isEqualTo("iot:controller:ctrl001_fault:fault1");
56
+    }
57
+}

+ 24
- 0
iot-platform/src/test/java/com/iot/platform/mqtt/MqttGenericConsumerTest.java Переглянути файл

@@ -0,0 +1,24 @@
1
+package com.iot.platform.mqtt;
2
+
3
+import org.junit.jupiter.api.DisplayName;
4
+import org.junit.jupiter.api.Test;
5
+
6
+import static org.assertj.core.api.Assertions.assertThat;
7
+
8
+class MqttGenericConsumerTest {
9
+
10
+    @Test
11
+    @DisplayName("getSubscribeTopic 应返回 +/generics")
12
+    void getSubscribeTopic_returnsCorrectValue() {
13
+        MqttGenericConsumer consumer = new MqttGenericConsumer();
14
+        assertThat(consumer.getSubscribeTopic()).isEqualTo("+/generics");
15
+    }
16
+
17
+    @Test
18
+    @DisplayName("generateClientId 应包含 mqttx 前缀")
19
+    void generateClientId_containsPrefix() {
20
+        MqttGenericConsumer consumer = new MqttGenericConsumer();
21
+        String clientId = consumer.generateClientId();
22
+        assertThat(clientId).startsWith("mqttx_e216fbf16");
23
+    }
24
+}

+ 83
- 0
iot-platform/src/test/java/com/iot/platform/service/SysrealtimeServiceTest.java Переглянути файл

@@ -0,0 +1,83 @@
1
+package com.iot.platform.service;
2
+
3
+import com.iot.platform.mapper.SysrealtimeMapper;
4
+import org.junit.jupiter.api.DisplayName;
5
+import org.junit.jupiter.api.Test;
6
+import org.junit.jupiter.api.extension.ExtendWith;
7
+import org.mockito.InjectMocks;
8
+import org.mockito.Mock;
9
+import org.mockito.junit.jupiter.MockitoExtension;
10
+
11
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
12
+import static org.mockito.Mockito.*;
13
+
14
+@ExtendWith(MockitoExtension.class)
15
+class SysrealtimeServiceTest {
16
+
17
+    @Mock
18
+    private SysrealtimeMapper sysrealtimeMapper;
19
+
20
+    @InjectMocks
21
+    private SysrealtimeService service;
22
+
23
+    @Test
24
+    @DisplayName("createrealtime: 空表名应抛出 IllegalArgumentException")
25
+    void createrealtime_nullTableName_throws() {
26
+        assertThatThrownBy(() -> service.createrealtime(null))
27
+            .isInstanceOf(IllegalArgumentException.class)
28
+            .hasMessageContaining("表名不能为空");
29
+    }
30
+
31
+    @Test
32
+    @DisplayName("createrealtime: SQL注入表名应抛出 IllegalArgumentException")
33
+    void createrealtime_sqlInjectionTableName_throws() {
34
+        assertThatThrownBy(() -> service.createrealtime("users; DROP TABLE users;"))
35
+            .isInstanceOf(IllegalArgumentException.class)
36
+            .hasMessageContaining("非法表名");
37
+    }
38
+
39
+    @Test
40
+    @DisplayName("createrealtime: 超长表名应抛出 IllegalArgumentException")
41
+    void createrealtime_tooLongTableName_throws() {
42
+        assertThatThrownBy(() -> service.createrealtime("a".repeat(65)))
43
+            .isInstanceOf(IllegalArgumentException.class)
44
+            .hasMessageContaining("表名过长");
45
+    }
46
+
47
+    @Test
48
+    @DisplayName("createrealtime: 合法表名应调用 mapper")
49
+    void createrealtime_validTableName_callsMapper() {
50
+        service.createrealtime("device_data_202401");
51
+        verify(sysrealtimeMapper).createrealtime("device_data_202401");
52
+    }
53
+
54
+    @Test
55
+    @DisplayName("inserttables: 非法表名应抛出异常且不调用 mapper")
56
+    void inserttables_invalidTableName_throwsWithoutCallingMapper() {
57
+        assertThatThrownBy(() -> service.inserttables("bad;name", "2024-01-01", "D1", "ts", "k", "v"))
58
+            .isInstanceOf(IllegalArgumentException.class);
59
+        verifyNoInteractions(sysrealtimeMapper);
60
+    }
61
+
62
+    @Test
63
+    @DisplayName("updatetables: 合法表名应调用 mapper")
64
+    void updatetables_validTableName_callsMapper() {
65
+        service.updatetables("dev_001", "2024-01-01", "v1", "ts", "key1", "D1");
66
+        verify(sysrealtimeMapper).updatetables("dev_001", "2024-01-01", "v1", "ts", "key1", "D1");
67
+    }
68
+
69
+    @Test
70
+    @DisplayName("selectkey: 非法表名应抛出异常")
71
+    void selectkey_invalidTableName_throws() {
72
+        assertThatThrownBy(() -> service.selectkey("--comment", "k"))
73
+            .isInstanceOf(IllegalArgumentException.class)
74
+            .hasMessageContaining("非法表名");
75
+    }
76
+
77
+    @Test
78
+    @DisplayName("selectAllKeys: 合法表名应调用 mapper")
79
+    void selectAllKeys_validTableName_callsMapper() {
80
+        service.selectAllKeys("realtime_001");
81
+        verify(sysrealtimeMapper).selectAllKeys("realtime_001");
82
+    }
83
+}

+ 16
- 0
start.sh Переглянути файл

@@ -0,0 +1,16 @@
1
+#!/bin/bash
2
+# IoT 平台启动脚本 — 自动加载 .env 环境变量后启动
3
+
4
+APP_DIR="$(cd "$(dirname "$0")" && pwd)"
5
+cd "$APP_DIR"
6
+
7
+if [ -f ".env" ]; then
8
+    set -a
9
+    source .env
10
+    set +a
11
+    echo "[start.sh] 已加载 .env 环境变量"
12
+else
13
+    echo "[start.sh] 警告: 未找到 .env 文件"
14
+fi
15
+
16
+./ry.sh "$@"

Завантаження…
Відмінити
Зберегти