#include #include "MQTT_ESP.h" #include "STATUS_LED.h" #include "MODBUS_ESP.h" #include "SNTP_ESP.h" #include "OFFLINE_STORAGE.h" #include "cJSON.h" #include "esp_wifi.h" #include "esp_mac.h" #include "esp_system.h" #include "esp_netif.h" #include static const char *TAG = "mqtt_esp"; // 全局MQTT客户端句柄 static esp_mqtt_client_handle_t g_client = NULL; // ============================ // 设备状态上报任务相关 // ============================ static TaskHandle_t device_status_task_handle = NULL; static uint32_t g_report_interval_ms = 10000; // 默认10秒上报一次 static SemaphoreHandle_t report_interval_mutex = NULL; static bool g_device_status_task_started = false; // 设备状态任务是否已启动 static bool g_device_status_task_auto_start = false; // 是否在MQTT连接后自动启动 // ============================ // 离线数据补传任务相关 // ============================ static TaskHandle_t offline_upload_task_handle = NULL; static bool g_is_online = false; // 网络在线状态 static char offline_data_buffer[2048]; // 静态缓冲区,避免栈溢出 /** * @brief 获取设备MAC地址字符串 */ static void get_device_mac(char *mac_str, size_t max_len) { uint8_t mac[6]; esp_read_mac(mac, ESP_MAC_WIFI_STA); snprintf(mac_str, max_len, "%02X:%02X:%02X:%02X:%02X:%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); } /** * @brief 获取设备运行时间(秒) */ static uint32_t get_device_uptime(void) { return (uint32_t)(xTaskGetTickCount() * portTICK_PERIOD_MS / 1000); } /** * @brief 获取设备IP地址字符串 */ static void get_device_ip(char *ip_str, size_t max_len) { esp_netif_t *netif = esp_netif_get_handle_from_ifkey("ETH_DEF"); if (netif != NULL) { esp_netif_ip_info_t ip_info; if (esp_netif_get_ip_info(netif, &ip_info) == ESP_OK) { snprintf(ip_str, max_len, IPSTR, IP2STR(&ip_info.ip)); } else { strncpy(ip_str, "N/A", max_len); } } else { strncpy(ip_str, "N/A", max_len); } } /** * @brief 获取当前时间字符串 */ static void get_current_time(char *time_str, size_t max_len) { // 使用SNTP组件获取格式化时间 if (sntp_esp_get_formatted_time(time_str, max_len, "%Y-%m-%d %H:%M:%S") != ESP_OK) { // 如果获取失败,使用本地时间 time_t now; time(&now); struct tm timeinfo; localtime_r(&now, &timeinfo); strftime(time_str, max_len, "%Y-%m-%d %H:%M:%S", &timeinfo); } } /** * @brief 获取LED状态描述(中文) */ static const char* get_led_state_desc(led_state_t state) { switch (state) { case LED_OFF: return "关闭"; case LED_ON: return "常亮"; case LED_BLINK_SLOW: return "慢闪"; case LED_BLINK_FAST: return "快闪"; case LED_HEARTBEAT: return "心跳"; default: return "未知"; } } /** * @brief 获取运行时间描述(中文) */ static void get_uptime_desc(uint32_t uptime_sec, char *desc, size_t max_len) { uint32_t days = uptime_sec / 86400; uint32_t hours = (uptime_sec % 86400) / 3600; uint32_t minutes = (uptime_sec % 3600) / 60; uint32_t seconds = uptime_sec % 60; if (days > 0) { snprintf(desc, max_len, "%lu天%lu小时%lu分%lu秒", days, hours, minutes, seconds); } else if (hours > 0) { snprintf(desc, max_len, "%lu小时%lu分%lu秒", hours, minutes, seconds); } else if (minutes > 0) { snprintf(desc, max_len, "%lu分%lu秒", minutes, seconds); } else { snprintf(desc, max_len, "%lu秒", seconds); } } /** * @brief 构建设备状态JSON */ static char* build_device_status_json(void) { char mac_str[18]; char ip_str[16]; char time_str[32]; char uptime_desc[64]; get_device_mac(mac_str, sizeof(mac_str)); get_device_ip(ip_str, sizeof(ip_str)); get_current_time(time_str, sizeof(time_str)); get_uptime_desc(get_device_uptime(), uptime_desc, sizeof(uptime_desc)); cJSON *root = cJSON_CreateObject(); if (root == NULL) { return NULL; } // 基本信息 cJSON_AddStringToObject(root, "message_type", "device_status"); cJSON_AddStringToObject(root, "mac_address", mac_str); cJSON_AddStringToObject(root, "ip_address", ip_str); cJSON_AddStringToObject(root, "chip_model", "ESP32-S3"); // IDF 版本 cJSON_AddStringToObject(root, "idf_version", esp_get_idf_version()); // 运行状态 cJSON_AddNumberToObject(root, "uptime", get_device_uptime()); cJSON_AddStringToObject(root, "uptime_desc", uptime_desc); // 中文描述 cJSON_AddNumberToObject(root, "free_heap", esp_get_free_heap_size()); cJSON_AddStringToObject(root, "status", "online"); cJSON_AddStringToObject(root, "status_desc", "在线"); // 中文描述 cJSON_AddStringToObject(root, "update_time", time_str); // 更新时间 // LED 状态(带中文描述) led_state_t led1_state = status_led_get_state(1); led_state_t led2_state = status_led_get_state(2); cJSON_AddNumberToObject(root, "led1_state", (int)led1_state); cJSON_AddStringToObject(root, "led1_desc", get_led_state_desc(led1_state)); // LED1: 网络状态 cJSON_AddNumberToObject(root, "led2_state", (int)led2_state); cJSON_AddStringToObject(root, "led2_desc", get_led_state_desc(led2_state)); // LED2: 通信状态 // LED 功能说明(中文) cJSON_AddStringToObject(root, "led1_function", "网络状态灯"); cJSON_AddStringToObject(root, "led2_function", "通信状态灯"); // MODBUS 轮询状态 modbus_poll_config_t *modbus_config = modbus_get_current_config(); if (modbus_config != NULL) { cJSON_AddNumberToObject(root, "modbus_enabled", modbus_config->enabled); cJSON_AddStringToObject(root, "modbus_enabled_desc", modbus_config->enabled ? "启用" : "禁用"); cJSON_AddNumberToObject(root, "modbus_channel", modbus_config->channel_num); cJSON_AddStringToObject(root, "modbus_channel_desc", modbus_config->channel_num == 0 ? "通道0 (UART0)" : "通道1 (UART2)"); cJSON_AddNumberToObject(root, "modbus_slave_addr", modbus_config->slave_addr); cJSON_AddNumberToObject(root, "modbus_interval", modbus_config->poll_interval_ms); } else { cJSON_AddNumberToObject(root, "modbus_enabled", 0); cJSON_AddStringToObject(root, "modbus_enabled_desc", "未配置"); cJSON_AddNumberToObject(root, "modbus_channel", 0); cJSON_AddStringToObject(root, "modbus_channel_desc", "N/A"); cJSON_AddNumberToObject(root, "modbus_slave_addr", 0); cJSON_AddNumberToObject(root, "modbus_interval", 0); } // 内存使用情况 uint32_t total_heap = esp_get_free_heap_size(); cJSON_AddStringToObject(root, "heap_status", total_heap > 100000 ? "充足" : (total_heap > 50000 ? "一般" : "紧张")); char *json_str = cJSON_Print(root); cJSON_Delete(root); return json_str; } /** * @brief 设备状态上报任务 */ static void device_status_report_task(void *arg) { ESP_LOGI(TAG, "设备状态上报任务已启动"); while (1) { // 检查是否应该退出 if (device_status_task_handle == NULL) { break; } // 构建设备状态JSON char *status_json = build_device_status_json(); if (status_json != NULL) { // 如果网络在线,直接发布;如果离线,存储到本地Flash if (g_is_online && g_client != NULL) { // 发布到MQTT int ret = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC, status_json, strlen(status_json), 0, 0); if (ret >= 0) { ESP_LOGI(TAG, "设备状态已发布,消息ID=%d", ret); ESP_LOGD(TAG, "状态: %s", status_json); } else { ESP_LOGW(TAG, "发布设备状态失败"); } } else { // 网络离线,存储到本地Flash ESP_LOGW(TAG, "网络离线,设备状态存储到本地"); esp_err_t ret = mqtt_store_offline(status_json, strlen(status_json), OFFLINE_DATA_TYPE_DEVICE_STATUS); if (ret == ESP_OK) { ESP_LOGI(TAG, "设备状态已成功存储到离线存储"); } else { ESP_LOGE(TAG, "存储设备状态到离线存储失败"); } } free(status_json); } // 获取上报间隔 uint32_t interval; xSemaphoreTake(report_interval_mutex, portMAX_DELAY); interval = g_report_interval_ms; xSemaphoreGive(report_interval_mutex); // 等待下一次上报 vTaskDelay(pdMS_TO_TICKS(interval)); } ESP_LOGI(TAG, "设备状态上报任务已退出"); device_status_task_handle = NULL; vTaskDelete(NULL); } BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms) { // 如果网络不在线,只保存配置,延迟到MQTT连接后启动 if (!g_is_online) { // 创建互斥量 if (report_interval_mutex == NULL) { report_interval_mutex = xSemaphoreCreateMutex(); if (report_interval_mutex == NULL) { ESP_LOGE(TAG, "创建报告间隔互斥锁失败"); return pdFALSE; } } // 保存配置 xSemaphoreTake(report_interval_mutex, portMAX_DELAY); g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000; xSemaphoreGive(report_interval_mutex); // 标记为自动启动 g_device_status_task_auto_start = true; ESP_LOGI(TAG, "设备状态任务将在MQTT连接后启动 (间隔=%dms)", g_report_interval_ms); return pdPASS; } // 如果任务已存在,先停止 if (device_status_task_handle != NULL) { mqtt_stop_device_status_task(); vTaskDelay(pdMS_TO_TICKS(100)); } // 创建互斥量 if (report_interval_mutex == NULL) { report_interval_mutex = xSemaphoreCreateMutex(); if (report_interval_mutex == NULL) { ESP_LOGE(TAG, "创建报告间隔互斥锁失败"); return pdFALSE; } } // 更新上报间隔 xSemaphoreTake(report_interval_mutex, portMAX_DELAY); g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000; xSemaphoreGive(report_interval_mutex); // 创建任务 BaseType_t ret = xTaskCreate(device_status_report_task, "dev_status", 8192, NULL, 5, &device_status_task_handle); if (ret == pdPASS) { g_device_status_task_started = true; ESP_LOGI(TAG, "设备状态报告任务已创建 (间隔=%dms)", g_report_interval_ms); } else { ESP_LOGE(TAG, "创建设备状态报告任务失败"); } return ret; } void mqtt_stop_device_status_task(void) { if (device_status_task_handle != NULL) { ESP_LOGI(TAG, "正在停止设备状态报告任务..."); TaskHandle_t temp_handle = device_status_task_handle; device_status_task_handle = NULL; vTaskDelete(temp_handle); vTaskDelay(pdMS_TO_TICKS(200)); ESP_LOGI(TAG, "设备状态报告任务已停止"); } } void mqtt_update_report_interval(uint32_t report_interval_ms) { if (report_interval_mutex != NULL) { xSemaphoreTake(report_interval_mutex, portMAX_DELAY); g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000; xSemaphoreGive(report_interval_mutex); ESP_LOGI(TAG, "Device status report interval updated to %dms", g_report_interval_ms); } } /** * @brief MQTT事件处理函数 * * 处理MQTT客户端的各种事件,包括连接、订阅、发布、数据接收和错误处理。 * * @param handler_args 事件处理器参数 * @param base 事件基础类型 * @param event_id 事件ID * @param event_data 指向MQTT事件数据的指针 */ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32, base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; int msg_id; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED - MQTT已连接"); // 订阅并取消订阅测试主题 msg_id = esp_mqtt_client_subscribe(client, CONFIG_MQTT_SUB_TOPIC, 1); ESP_LOGI(TAG, "订阅发送成功, 消息ID=%d", msg_id); status_led_blink_mode(2, 2); // LED2 心跳:MQTT连接正常 // 网络在线,启动离线数据补传任务 g_is_online = true; ESP_LOGI(TAG, "网络在线,开始离线数据上传"); // 启动设备状态上报任务(如果配置了自动启动) if (g_device_status_task_auto_start && !g_device_status_task_started) { if (mqtt_start_device_status_task(g_report_interval_ms) == pdPASS) { g_device_status_task_started = true; g_device_status_task_auto_start = false; ESP_LOGI(TAG, "MQTT连接时自动启动设备状态任务"); } } break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED - MQTT已断开连接"); status_led_blink_mode(2, 1); // LED2 快闪:MQTT断开,正在重连 // 网络离线,停止离线数据补传任务 g_is_online = false; ESP_LOGI(TAG, "网络离线,停止离线数据上传"); // 停止设备状态上报任务(避免存储新数据) if (g_device_status_task_started) { mqtt_stop_device_status_task(); g_device_status_task_started = false; g_device_status_task_auto_start = true; // 重连后自动启动 ESP_LOGI(TAG, "设备状态任务已停止,将在重连时自动重启"); } break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d, return code=0x%02x ", event->msg_id, (uint8_t)*event->data); // 立即上报一次设备状态 char *status_json = build_device_status_json(); if (status_json != NULL) { msg_id = esp_mqtt_client_publish(client, CONFIG_MQTT_PUB_TOPIC, status_json, strlen(status_json), 0, 0); if (msg_id >= 0) { ESP_LOGI(TAG, "Device status published immediately, msg_id=%d", msg_id); } free(status_json); } break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic); ESP_LOGI(TAG, "DATA=%.*s", event->data_len, event->data); // 解析MQTT控制指令(JSON格式) if (event->data_len > 0 && event->data != NULL) { // 创建临时缓冲区存储JSON数据 char *json_str = malloc(event->data_len + 1); if (json_str != NULL) { memcpy(json_str, event->data, event->data_len); json_str[event->data_len] = '\0'; // 解析JSON cJSON *root = cJSON_Parse(json_str); if (root != NULL) { // 检查是否是MODBUS轮询控制指令 cJSON *cmd = cJSON_GetObjectItem(root, "command"); if (cmd != NULL && cJSON_IsString(cmd)) { if (strcmp(cmd->valuestring, "modbus_poll") == 0) { // 解析轮询配置 cJSON *channel = cJSON_GetObjectItem(root, "channel"); cJSON *slave_addr = cJSON_GetObjectItem(root, "slave_addr"); cJSON *start_addr = cJSON_GetObjectItem(root, "start_addr"); cJSON *reg_count = cJSON_GetObjectItem(root, "reg_count"); cJSON *interval = cJSON_GetObjectItem(root, "interval"); cJSON *enabled = cJSON_GetObjectItem(root, "enabled"); // 验证必要字段 if (channel != NULL && cJSON_IsNumber(channel) && slave_addr != NULL && cJSON_IsNumber(slave_addr) && start_addr != NULL && cJSON_IsNumber(start_addr) && reg_count != NULL && cJSON_IsNumber(reg_count) && interval != NULL && cJSON_IsNumber(interval)) { // 构建轮询配置 modbus_poll_config_t poll_config = { .channel_num = channel->valueint, .slave_addr = (uint8_t)slave_addr->valueint, .start_addr = (uint16_t)start_addr->valueint, .reg_count = (uint16_t)reg_count->valueint, .poll_interval_ms = (uint32_t)interval->valueint, .enabled = (enabled != NULL && cJSON_IsBool(enabled)) ? cJSON_IsTrue(enabled) : true }; // 更新轮询配置 if (modbus_update_poll_config(&poll_config)) { ESP_LOGI(TAG, "通过MQTT更新MODBUS轮询配置成功"); } else { ESP_LOGE(TAG, "通过MQTT更新MODBUS轮询配置失败"); } } else { ESP_LOGE(TAG, "Missing required fields in MODBUS poll command"); } } } cJSON_Delete(root); } else { ESP_LOGE(TAG, "Failed to parse JSON: %s", json_str); } free(json_str); } } break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); // 错误类型处理分支 if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { ESP_LOGI(TAG, "Last error code reported from esp-tls: 0x%x", event->error_handle->esp_tls_last_esp_err); ESP_LOGI(TAG, "Last tls stack error number: 0x%x", event->error_handle->esp_tls_stack_err); ESP_LOGI(TAG, "Last captured errno : %d (%s)", event->error_handle->esp_transport_sock_errno, strerror(event->error_handle->esp_transport_sock_errno)); } else if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) { ESP_LOGI(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code); } else { ESP_LOGW(TAG, "Unknown error type: 0x%x", event->error_handle->error_type); } status_led_blink_mode(2, 0); // LED2 慢闪:MQTT错误 break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } } /** * @brief 启动MQTT客户端应用程序 * * 初始化MQTT客户端配置,注册事件处理程序,并启动MQTT连接 * * @param 无参数 * @return 无返回值 */ void mqtt_app_start(void) { // 配置MQTT客户端结构体,设置代理服务器地址和证书验证 const esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = CONFIG_BROKER_URI, // .broker.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start, .credentials.client_id = CONFIG_MQTT_CLIENT_ID, .credentials.username = CONFIG_MQTT_USERNAME, .credentials.authentication.password = CONFIG_MQTT_PASSWORD, }; ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size()); g_client = esp_mqtt_client_init(&mqtt_cfg); /* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */ esp_mqtt_client_register_event(g_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); esp_mqtt_client_start(g_client); } /** * @brief 发布MQTT消息 * * 提供一个外部接口来发布MQTT消息 * * @param topic 发布的主题 * @param data 要发布的数据 * @param len 数据长度 * @param qos QoS级别 (0, 1, 或 2) * @param retain 是否保留消息 * @return 消息ID,如果失败则返回负数 */ int mqtt_publish_message(const char* topic, const char* data, int len, int qos, int retain) { if (g_client == NULL) { ESP_LOGE(TAG, "MQTT client not initialized"); return -1; } // 如果网络离线,存储数据到本地Flash if (!g_is_online) { ESP_LOGW(TAG, "网络离线,数据存储到本地 (主题: %s, 长度: %d)", topic, len); // 根据主题判断数据类型 // 设备状态数据主题通常包含 "status" 或特定的设备状态主题 // 传感器数据使用默认的发布主题,不包含特定状态关键字 offline_data_type_t data_type = OFFLINE_DATA_TYPE_MODBUS; // 只有当主题明确包含"status"关键字时才认为是设备状态数据 // 避免将包含"device"的传感器数据主题误判为设备状态 if (strstr(topic, "status") != NULL) { data_type = OFFLINE_DATA_TYPE_DEVICE_STATUS; } // 存储离线数据 esp_err_t ret = mqtt_store_offline(data, len, data_type); if (ret == ESP_OK) { ESP_LOGI(TAG, "Data stored to offline storage successfully"); return 0; // 返回0表示已存储(模拟成功) } else { ESP_LOGE(TAG, "Failed to store data to offline storage"); return -1; } } // 网络在线,正常发布 int msg_id = esp_mqtt_client_publish(g_client, topic, data, len, qos, retain); if (msg_id < 0) { ESP_LOGE(TAG, "Failed to publish message to topic: %s", topic); return -1; } return msg_id; } // ============================ // 离线数据补传任务 // ============================ /** * @brief 检查并发布离线数据(网络在线时) * * @return ESP_OK 成功处理(包括没有数据的情况) * ESP_FAIL 处理失败 */ static esp_err_t publish_offline_data(void) { if (!offline_storage_has_data()) { return ESP_OK; } // 使用静态缓冲区,避免栈溢出 offline_data_type_t data_type; // 读取最旧的离线数据 esp_err_t ret = offline_storage_read_oldest(offline_data_buffer, sizeof(offline_data_buffer), &data_type); if (ret != ESP_OK) { if (ret == ESP_ERR_NOT_FOUND) { return ESP_OK; // 没有数据,不是错误 } ESP_LOGE(TAG, "Failed to read offline data: %s", esp_err_to_name(ret)); // 读取失败,删除可能损坏的数据记录,避免死循环 ESP_LOGW(TAG, "Deleting potentially corrupted data record"); offline_storage_delete_oldest(); return ret; } // 优先上传传感器数据,设备状态数据丢弃 if (data_type == OFFLINE_DATA_TYPE_DEVICE_STATUS) { ESP_LOGW(TAG, "Discarding device status offline data (priority: sensor data only)"); offline_storage_delete_oldest(); return ESP_OK; // 返回成功,继续处理下一条 } ESP_LOGI(TAG, "Publishing offline data (type=%d, size=%zu)", data_type, strlen(offline_data_buffer)); // 为补发数据添加标识:is_retrospective = true // 解析JSON,添加字段后重新序列化 cJSON *root = cJSON_Parse(offline_data_buffer); if (root != NULL) { // 添加补发标识 cJSON_AddBoolToObject(root, "is_retrospective", true); // 重新序列化为字符串 char *json_with_flag = cJSON_PrintUnformatted(root); if (json_with_flag != NULL) { ESP_LOGI(TAG, "Publishing offline data with retrospective flag: %s", json_with_flag); // 使用新字符串发布 int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC, json_with_flag, strlen(json_with_flag), 0, 0); free(json_with_flag); cJSON_Delete(root); if (msg_id >= 0) { ESP_LOGI(TAG, "Offline data with retrospective flag published successfully, msg_id=%d", msg_id); // 发布成功,删除已上传的数据 ret = offline_storage_delete_oldest(); if (ret != ESP_OK) { ESP_LOGW(TAG, "Failed to delete published offline data"); } return ESP_OK; } else { ESP_LOGE(TAG, "Failed to publish offline data with retrospective flag"); } } else { ESP_LOGE(TAG, "Failed to re-serialize JSON with retrospective flag"); cJSON_Delete(root); } } else { ESP_LOGW(TAG, "Failed to parse JSON for adding retrospective flag, publishing raw data: %s", offline_data_buffer); // 如果JSON解析失败,直接发布原始数据(不添加标识) int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC, offline_data_buffer, strlen(offline_data_buffer), 0, 0); if (msg_id >= 0) { ESP_LOGI(TAG, "Raw offline data published successfully, msg_id=%d", msg_id); // 发布成功,删除已上传的数据 ret = offline_storage_delete_oldest(); if (ret != ESP_OK) { ESP_LOGW(TAG, "Failed to delete published offline data"); } return ESP_OK; } else { ESP_LOGE(TAG, "Failed to publish raw offline data"); } } // 发布失败也删除数据,避免重复尝试 ESP_LOGW(TAG, "Deleting failed data to avoid retry loop"); offline_storage_delete_oldest(); return ESP_FAIL; } /** * @brief 存储离线数据(网络离线时) * * @param data 数据内容 * @param length 数据长度 * @param data_type 数据类型 * @return ESP_OK 成功 * ESP_FAIL 失败 */ esp_err_t mqtt_store_offline(const char *data, size_t length, offline_data_type_t data_type) { if (g_is_online) { ESP_LOGW(TAG, "Network is online, storing data not needed"); return ESP_OK; // 网络在线,不需要存储 } ESP_LOGI(TAG, "Storing offline data (type=%d, size=%zu)", data_type, length); esp_err_t ret = offline_storage_store(data, length, data_type); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to store offline data: %s", esp_err_to_name(ret)); return ESP_FAIL; } // 获取存储使用情况 size_t used = 0, total = 0; if (offline_storage_get_usage(&used, &total) == ESP_OK) { ESP_LOGI(TAG, "Storage usage: %zu / %zu bytes (%.1f%%)", used, total, (used * 100.0) / total); } return ESP_OK; } /** * @brief 离线数据补传任务 * * 当网络在线时,持续检查并上传离线存储的数据 */ static void offline_upload_task(void *pvParameters) { ESP_LOGI(TAG, "Offline data upload task started"); while (true) { if (g_is_online && g_client != NULL) { // 检查并上传离线数据 if (offline_storage_has_data()) { ESP_LOGI(TAG, "Found %u offline data files, uploading...", offline_storage_get_count()); // 尝试发布数据,每次处理一条 publish_offline_data(); } } // 每100毫秒检查一次(提高上传速率) vTaskDelay(pdMS_TO_TICKS(100)); } } /** * @brief 启动离线数据补传任务 * * @return pdTRUE 成功, pdFALSE 失败 */ BaseType_t mqtt_start_offline_upload_task(void) { if (offline_upload_task_handle != NULL) { ESP_LOGW(TAG, "Offline upload task already running"); return pdTRUE; } BaseType_t ret = xTaskCreate(offline_upload_task, "offline_upload", 8192, NULL, 4, &offline_upload_task_handle); if (ret == pdPASS) { ESP_LOGI(TAG, "Offline data upload task started"); } else { ESP_LOGE(TAG, "Failed to create offline data upload task"); } return ret; } /** * @brief 停止离线数据补传任务 */ void mqtt_stop_offline_upload_task(void) { if (offline_upload_task_handle != NULL) { ESP_LOGI(TAG, "Stopping offline data upload task..."); TaskHandle_t temp_handle = offline_upload_task_handle; offline_upload_task_handle = NULL; vTaskDelete(temp_handle); vTaskDelay(pdMS_TO_TICKS(200)); ESP_LOGI(TAG, "Offline data upload task stopped"); } }