增加断网保存数据到FLASH,恢复联网重新补发,并加上标志位

This commit is contained in:
Wang Beihong
2026-02-02 00:31:52 +08:00
parent 86b2425e93
commit d56c730cfe
19 changed files with 1853 additions and 52 deletions

View File

@@ -1,3 +1,3 @@
idf_component_register(SRCS "MQTT_ESP.c"
PRIV_REQUIRES mqtt log STATUS_LED MODBUS_ESP SNTP_ESP json esp_wifi esp_system
PRIV_REQUIRES mqtt log STATUS_LED MODBUS_ESP SNTP_ESP OFFLINE_STORAGE json esp_wifi esp_system
INCLUDE_DIRS "include")

View File

@@ -3,6 +3,7 @@
#include "STATUS_LED.h"
#include "MODBUS_ESP.h"
#include "SNTP_ESP.h"
#include "OFFLINE_STORAGE.h"
#include "cJSON.h"
#include "esp_wifi.h"
#include "esp_mac.h"
@@ -23,6 +24,16 @@ static esp_mqtt_client_handle_t g_client = NULL;
static TaskHandle_t device_status_task_handle = NULL;
static uint32_t g_report_interval_ms = 10000; // 默认10秒上报一次
static SemaphoreHandle_t report_interval_mutex = NULL;
static bool g_device_status_task_started = false; // 设备状态任务是否已启动
static bool g_device_status_task_auto_start = false; // 是否在MQTT连接后自动启动
// ============================
// 离线数据补传任务相关
// ============================
static TaskHandle_t offline_upload_task_handle = NULL;
static bool g_is_online = false; // 网络在线状态
static char offline_data_buffer[2048]; // 静态缓冲区,避免栈溢出
/**
* @brief 获取设备MAC地址字符串
@@ -194,7 +205,7 @@ static char* build_device_status_json(void)
*/
static void device_status_report_task(void *arg)
{
ESP_LOGI(TAG, "Device status report task started");
ESP_LOGI(TAG, "设备状态上报任务已启动");
while (1) {
// 检查是否应该退出
@@ -202,22 +213,32 @@ static void device_status_report_task(void *arg)
break;
}
// 检查MQTT是否已连接
if (g_client != NULL) {
// 构建设备状态JSON
char *status_json = build_device_status_json();
if (status_json != NULL) {
// 构建设备状态JSON
char *status_json = build_device_status_json();
if (status_json != NULL) {
// 如果网络在线直接发布如果离线存储到本地Flash
if (g_is_online && g_client != NULL) {
// 发布到MQTT
int ret = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
status_json, strlen(status_json), 0, 0);
if (ret >= 0) {
ESP_LOGI(TAG, "Device status published, msg_id=%d", ret);
ESP_LOGD(TAG, "Status: %s", status_json);
ESP_LOGI(TAG, "设备状态已发布消息ID=%d", ret);
ESP_LOGD(TAG, "状态: %s", status_json);
} else {
ESP_LOGW(TAG, "Failed to publish device status");
ESP_LOGW(TAG, "发布设备状态失败");
}
} else {
// 网络离线存储到本地Flash
ESP_LOGW(TAG, "网络离线,设备状态存储到本地");
esp_err_t ret = mqtt_store_offline(status_json, strlen(status_json),
OFFLINE_DATA_TYPE_DEVICE_STATUS);
if (ret == ESP_OK) {
ESP_LOGI(TAG, "设备状态已成功存储到离线存储");
} else {
ESP_LOGE(TAG, "存储设备状态到离线存储失败");
}
free(status_json);
}
free(status_json);
}
// 获取上报间隔
@@ -230,13 +251,35 @@ static void device_status_report_task(void *arg)
vTaskDelay(pdMS_TO_TICKS(interval));
}
ESP_LOGI(TAG, "Device status report task exiting");
ESP_LOGI(TAG, "设备状态上报任务已退出");
device_status_task_handle = NULL;
vTaskDelete(NULL);
}
BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms)
{
// 如果网络不在线只保存配置延迟到MQTT连接后启动
if (!g_is_online) {
// 创建互斥量
if (report_interval_mutex == NULL) {
report_interval_mutex = xSemaphoreCreateMutex();
if (report_interval_mutex == NULL) {
ESP_LOGE(TAG, "创建报告间隔互斥锁失败");
return pdFALSE;
}
}
// 保存配置
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
xSemaphoreGive(report_interval_mutex);
// 标记为自动启动
g_device_status_task_auto_start = true;
ESP_LOGI(TAG, "设备状态任务将在MQTT连接后启动 (间隔=%dms)", g_report_interval_ms);
return pdPASS;
}
// 如果任务已存在,先停止
if (device_status_task_handle != NULL) {
mqtt_stop_device_status_task();
@@ -247,7 +290,7 @@ BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms)
if (report_interval_mutex == NULL) {
report_interval_mutex = xSemaphoreCreateMutex();
if (report_interval_mutex == NULL) {
ESP_LOGE(TAG, "Failed to create report interval mutex");
ESP_LOGE(TAG, "创建报告间隔互斥锁失败");
return pdFALSE;
}
}
@@ -259,12 +302,13 @@ BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms)
// 创建任务
BaseType_t ret = xTaskCreate(device_status_report_task, "dev_status",
4096, NULL, 5, &device_status_task_handle);
8192, NULL, 5, &device_status_task_handle);
if (ret == pdPASS) {
ESP_LOGI(TAG, "Device status report task started (interval=%dms)", g_report_interval_ms);
g_device_status_task_started = true;
ESP_LOGI(TAG, "设备状态报告任务已创建 (间隔=%dms)", g_report_interval_ms);
} else {
ESP_LOGE(TAG, "Failed to create device status report task");
ESP_LOGE(TAG, "创建设备状态报告任务失败");
}
return ret;
@@ -273,14 +317,14 @@ BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms)
void mqtt_stop_device_status_task(void)
{
if (device_status_task_handle != NULL) {
ESP_LOGI(TAG, "Stopping device status report task...");
ESP_LOGI(TAG, "正在停止设备状态报告任务...");
TaskHandle_t temp_handle = device_status_task_handle;
device_status_task_handle = NULL;
vTaskDelete(temp_handle);
vTaskDelay(pdMS_TO_TICKS(200));
ESP_LOGI(TAG, "Device status report task stopped");
ESP_LOGI(TAG, "设备状态报告任务已停止");
}
}
@@ -313,15 +357,40 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
int msg_id;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED - MQTT已连接");
// 订阅并取消订阅测试主题
msg_id = esp_mqtt_client_subscribe(client, CONFIG_MQTT_SUB_TOPIC, 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
status_led_blink_mode(2, 2); // LED2 心跳MQTT连接正常
ESP_LOGI(TAG, "订阅发送成功, 消息ID=%d", msg_id);
status_led_blink_mode(2, 2); // LED2 心跳MQTT连接正常
// 网络在线,启动离线数据补传任务
g_is_online = true;
ESP_LOGI(TAG, "网络在线,开始离线数据上传");
// 启动设备状态上报任务(如果配置了自动启动)
if (g_device_status_task_auto_start && !g_device_status_task_started) {
if (mqtt_start_device_status_task(g_report_interval_ms) == pdPASS) {
g_device_status_task_started = true;
g_device_status_task_auto_start = false;
ESP_LOGI(TAG, "MQTT连接时自动启动设备状态任务");
}
}
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED - MQTT已断开连接");
status_led_blink_mode(2, 1); // LED2 快闪MQTT断开正在重连
// 网络离线,停止离线数据补传任务
g_is_online = false;
ESP_LOGI(TAG, "网络离线,停止离线数据上传");
// 停止设备状态上报任务(避免存储新数据)
if (g_device_status_task_started) {
mqtt_stop_device_status_task();
g_device_status_task_started = false;
g_device_status_task_auto_start = true; // 重连后自动启动
ESP_LOGI(TAG, "设备状态任务已停止,将在重连时自动重启");
}
break;
case MQTT_EVENT_SUBSCRIBED:
@@ -388,12 +457,12 @@ status_led_blink_mode(2, 2); // LED2 心跳MQTT连接正常
.enabled = (enabled != NULL && cJSON_IsBool(enabled)) ? cJSON_IsTrue(enabled) : true
};
// 更新轮询配置
if (modbus_update_poll_config(&poll_config)) {
ESP_LOGI(TAG, "MODBUS poll config updated via MQTT");
} else {
ESP_LOGE(TAG, "Failed to update MODBUS poll config");
}
// 更新轮询配置
if (modbus_update_poll_config(&poll_config)) {
ESP_LOGI(TAG, "通过MQTT更新MODBUS轮询配置成功");
} else {
ESP_LOGE(TAG, "通过MQTT更新MODBUS轮询配置失败");
}
} else {
ESP_LOGE(TAG, "Missing required fields in MODBUS poll command");
}
@@ -476,12 +545,243 @@ int mqtt_publish_message(const char* topic, const char* data, int len, int qos,
return -1;
}
// 如果网络离线存储数据到本地Flash
if (!g_is_online) {
ESP_LOGW(TAG, "网络离线,数据存储到本地 (主题: %s, 长度: %d)", topic, len);
// 根据主题判断数据类型
// 设备状态数据主题通常包含 "status" 或特定的设备状态主题
// 传感器数据使用默认的发布主题,不包含特定状态关键字
offline_data_type_t data_type = OFFLINE_DATA_TYPE_MODBUS;
// 只有当主题明确包含"status"关键字时才认为是设备状态数据
// 避免将包含"device"的传感器数据主题误判为设备状态
if (strstr(topic, "status") != NULL) {
data_type = OFFLINE_DATA_TYPE_DEVICE_STATUS;
}
// 存储离线数据
esp_err_t ret = mqtt_store_offline(data, len, data_type);
if (ret == ESP_OK) {
ESP_LOGI(TAG, "Data stored to offline storage successfully");
return 0; // 返回0表示已存储模拟成功
} else {
ESP_LOGE(TAG, "Failed to store data to offline storage");
return -1;
}
}
// 网络在线,正常发布
int msg_id = esp_mqtt_client_publish(g_client, topic, data, len, qos, retain);
if (msg_id < 0) {
ESP_LOGE(TAG, "Failed to publish message to topic: %s", topic);
return -1;
}
ESP_LOGI(TAG, "Published message to topic: %s, msg_id: %d", topic, msg_id);
return msg_id;
}
// ============================
// 离线数据补传任务
// ============================
/**
* @brief 检查并发布离线数据(网络在线时)
*
* @return ESP_OK 成功处理(包括没有数据的情况)
* ESP_FAIL 处理失败
*/
static esp_err_t publish_offline_data(void)
{
if (!offline_storage_has_data()) {
return ESP_OK;
}
// 使用静态缓冲区,避免栈溢出
offline_data_type_t data_type;
// 读取最旧的离线数据
esp_err_t ret = offline_storage_read_oldest(offline_data_buffer, sizeof(offline_data_buffer), &data_type);
if (ret != ESP_OK) {
if (ret == ESP_ERR_NOT_FOUND) {
return ESP_OK; // 没有数据,不是错误
}
ESP_LOGE(TAG, "Failed to read offline data: %s", esp_err_to_name(ret));
// 读取失败,删除可能损坏的数据记录,避免死循环
ESP_LOGW(TAG, "Deleting potentially corrupted data record");
offline_storage_delete_oldest();
return ret;
}
// 优先上传传感器数据,设备状态数据丢弃
if (data_type == OFFLINE_DATA_TYPE_DEVICE_STATUS) {
ESP_LOGW(TAG, "Discarding device status offline data (priority: sensor data only)");
offline_storage_delete_oldest();
return ESP_OK; // 返回成功,继续处理下一条
}
ESP_LOGI(TAG, "Publishing offline data (type=%d, size=%zu)", data_type, strlen(offline_data_buffer));
// 为补发数据添加标识is_retrospective = true
// 解析JSON添加字段后重新序列化
cJSON *root = cJSON_Parse(offline_data_buffer);
if (root != NULL) {
// 添加补发标识
cJSON_AddBoolToObject(root, "is_retrospective", true);
// 重新序列化为字符串
char *json_with_flag = cJSON_PrintUnformatted(root);
if (json_with_flag != NULL) {
ESP_LOGI(TAG, "Publishing offline data with retrospective flag: %s", json_with_flag);
// 使用新字符串发布
int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
json_with_flag, strlen(json_with_flag), 0, 0);
free(json_with_flag);
cJSON_Delete(root);
if (msg_id >= 0) {
ESP_LOGI(TAG, "Offline data with retrospective flag published successfully, msg_id=%d", msg_id);
// 发布成功,删除已上传的数据
ret = offline_storage_delete_oldest();
if (ret != ESP_OK) {
ESP_LOGW(TAG, "Failed to delete published offline data");
}
return ESP_OK;
} else {
ESP_LOGE(TAG, "Failed to publish offline data with retrospective flag");
}
} else {
ESP_LOGE(TAG, "Failed to re-serialize JSON with retrospective flag");
cJSON_Delete(root);
}
} else {
ESP_LOGW(TAG, "Failed to parse JSON for adding retrospective flag, publishing raw data: %s", offline_data_buffer);
// 如果JSON解析失败直接发布原始数据不添加标识
int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
offline_data_buffer, strlen(offline_data_buffer), 0, 0);
if (msg_id >= 0) {
ESP_LOGI(TAG, "Raw offline data published successfully, msg_id=%d", msg_id);
// 发布成功,删除已上传的数据
ret = offline_storage_delete_oldest();
if (ret != ESP_OK) {
ESP_LOGW(TAG, "Failed to delete published offline data");
}
return ESP_OK;
} else {
ESP_LOGE(TAG, "Failed to publish raw offline data");
}
}
// 发布失败也删除数据,避免重复尝试
ESP_LOGW(TAG, "Deleting failed data to avoid retry loop");
offline_storage_delete_oldest();
return ESP_FAIL;
}
/**
* @brief 存储离线数据(网络离线时)
*
* @param data 数据内容
* @param length 数据长度
* @param data_type 数据类型
* @return ESP_OK 成功
* ESP_FAIL 失败
*/
esp_err_t mqtt_store_offline(const char *data, size_t length, offline_data_type_t data_type)
{
if (g_is_online) {
ESP_LOGW(TAG, "Network is online, storing data not needed");
return ESP_OK; // 网络在线,不需要存储
}
ESP_LOGI(TAG, "Storing offline data (type=%d, size=%zu)", data_type, length);
esp_err_t ret = offline_storage_store(data, length, data_type);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Failed to store offline data: %s", esp_err_to_name(ret));
return ESP_FAIL;
}
// 获取存储使用情况
size_t used = 0, total = 0;
if (offline_storage_get_usage(&used, &total) == ESP_OK) {
ESP_LOGI(TAG, "Storage usage: %zu / %zu bytes (%.1f%%)",
used, total, (used * 100.0) / total);
}
return ESP_OK;
}
/**
* @brief 离线数据补传任务
*
* 当网络在线时,持续检查并上传离线存储的数据
*/
static void offline_upload_task(void *pvParameters)
{
ESP_LOGI(TAG, "Offline data upload task started");
while (true) {
if (g_is_online && g_client != NULL) {
// 检查并上传离线数据
if (offline_storage_has_data()) {
ESP_LOGI(TAG, "Found %u offline data files, uploading...",
offline_storage_get_count());
// 尝试发布数据,每次处理一条
publish_offline_data();
}
}
// 每100毫秒检查一次提高上传速率
vTaskDelay(pdMS_TO_TICKS(100));
}
}
/**
* @brief 启动离线数据补传任务
*
* @return pdTRUE 成功, pdFALSE 失败
*/
BaseType_t mqtt_start_offline_upload_task(void)
{
if (offline_upload_task_handle != NULL) {
ESP_LOGW(TAG, "Offline upload task already running");
return pdTRUE;
}
BaseType_t ret = xTaskCreate(offline_upload_task, "offline_upload",
8192, NULL, 4, &offline_upload_task_handle);
if (ret == pdPASS) {
ESP_LOGI(TAG, "Offline data upload task started");
} else {
ESP_LOGE(TAG, "Failed to create offline data upload task");
}
return ret;
}
/**
* @brief 停止离线数据补传任务
*/
void mqtt_stop_offline_upload_task(void)
{
if (offline_upload_task_handle != NULL) {
ESP_LOGI(TAG, "Stopping offline data upload task...");
TaskHandle_t temp_handle = offline_upload_task_handle;
offline_upload_task_handle = NULL;
vTaskDelete(temp_handle);
vTaskDelay(pdMS_TO_TICKS(200));
ESP_LOGI(TAG, "Offline data upload task stopped");
}
}

View File

@@ -1,5 +1,6 @@
#include "mqtt_client.h"
#include "esp_log.h"
#include "OFFLINE_STORAGE.h"
void mqtt_app_start(void);
int mqtt_publish_message(const char* topic, const char* data, int len, int qos, int retain);
@@ -22,4 +23,27 @@ void mqtt_stop_device_status_task(void);
*
* @param report_interval_ms 新的上报间隔(毫秒)
*/
void mqtt_update_report_interval(uint32_t report_interval_ms);
void mqtt_update_report_interval(uint32_t report_interval_ms);
/**
* @brief 存储离线数据(网络离线时使用)
*
* @param data 数据内容
* @param length 数据长度
* @param data_type 数据类型
* @return ESP_OK 成功
* ESP_FAIL 失败
*/
esp_err_t mqtt_store_offline(const char *data, size_t length, offline_data_type_t data_type);
/**
* @brief 启动离线数据补传任务
*
* @return pdTRUE 成功, pdFALSE 失败
*/
BaseType_t mqtt_start_offline_upload_task(void);
/**
* @brief 停止离线数据补传任务
*/
void mqtt_stop_offline_upload_task(void);