Files

788 lines
29 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include <stdio.h>
#include "MQTT_ESP.h"
#include "STATUS_LED.h"
#include "MODBUS_ESP.h"
#include "SNTP_ESP.h"
#include "OFFLINE_STORAGE.h"
#include "cJSON.h"
#include "esp_wifi.h"
#include "esp_mac.h"
#include "esp_system.h"
#include "esp_netif.h"
#include <time.h>
static const char *TAG = "mqtt_esp";
// 全局MQTT客户端句柄
static esp_mqtt_client_handle_t g_client = NULL;
// ============================
// 设备状态上报任务相关
// ============================
static TaskHandle_t device_status_task_handle = NULL;
static uint32_t g_report_interval_ms = 10000; // 默认10秒上报一次
static SemaphoreHandle_t report_interval_mutex = NULL;
static bool g_device_status_task_started = false; // 设备状态任务是否已启动
static bool g_device_status_task_auto_start = false; // 是否在MQTT连接后自动启动
// ============================
// 离线数据补传任务相关
// ============================
static TaskHandle_t offline_upload_task_handle = NULL;
static bool g_is_online = false; // 网络在线状态
static char offline_data_buffer[2048]; // 静态缓冲区,避免栈溢出
/**
* @brief 获取设备MAC地址字符串
*/
static void get_device_mac(char *mac_str, size_t max_len)
{
uint8_t mac[6];
esp_read_mac(mac, ESP_MAC_WIFI_STA);
snprintf(mac_str, max_len, "%02X:%02X:%02X:%02X:%02X:%02X",
mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
}
/**
* @brief 获取设备运行时间(秒)
*/
static uint32_t get_device_uptime(void)
{
return (uint32_t)(xTaskGetTickCount() * portTICK_PERIOD_MS / 1000);
}
/**
* @brief 获取设备IP地址字符串
*/
static void get_device_ip(char *ip_str, size_t max_len)
{
esp_netif_t *netif = esp_netif_get_handle_from_ifkey("ETH_DEF");
if (netif != NULL) {
esp_netif_ip_info_t ip_info;
if (esp_netif_get_ip_info(netif, &ip_info) == ESP_OK) {
snprintf(ip_str, max_len, IPSTR, IP2STR(&ip_info.ip));
} else {
strncpy(ip_str, "N/A", max_len);
}
} else {
strncpy(ip_str, "N/A", max_len);
}
}
/**
* @brief 获取当前时间字符串
*/
static void get_current_time(char *time_str, size_t max_len)
{
// 使用SNTP组件获取格式化时间
if (sntp_esp_get_formatted_time(time_str, max_len, "%Y-%m-%d %H:%M:%S") != ESP_OK) {
// 如果获取失败,使用本地时间
time_t now;
time(&now);
struct tm timeinfo;
localtime_r(&now, &timeinfo);
strftime(time_str, max_len, "%Y-%m-%d %H:%M:%S", &timeinfo);
}
}
/**
* @brief 获取LED状态描述中文
*/
static const char* get_led_state_desc(led_state_t state)
{
switch (state) {
case LED_OFF: return "关闭";
case LED_ON: return "常亮";
case LED_BLINK_SLOW: return "慢闪";
case LED_BLINK_FAST: return "快闪";
case LED_HEARTBEAT: return "心跳";
default: return "未知";
}
}
/**
* @brief 获取运行时间描述(中文)
*/
static void get_uptime_desc(uint32_t uptime_sec, char *desc, size_t max_len)
{
uint32_t days = uptime_sec / 86400;
uint32_t hours = (uptime_sec % 86400) / 3600;
uint32_t minutes = (uptime_sec % 3600) / 60;
uint32_t seconds = uptime_sec % 60;
if (days > 0) {
snprintf(desc, max_len, "%lu天%lu小时%lu分%lu秒", days, hours, minutes, seconds);
} else if (hours > 0) {
snprintf(desc, max_len, "%lu小时%lu分%lu秒", hours, minutes, seconds);
} else if (minutes > 0) {
snprintf(desc, max_len, "%lu分%lu秒", minutes, seconds);
} else {
snprintf(desc, max_len, "%lu秒", seconds);
}
}
/**
* @brief 构建设备状态JSON
*/
static char* build_device_status_json(void)
{
char mac_str[18];
char ip_str[16];
char time_str[32];
char uptime_desc[64];
get_device_mac(mac_str, sizeof(mac_str));
get_device_ip(ip_str, sizeof(ip_str));
get_current_time(time_str, sizeof(time_str));
get_uptime_desc(get_device_uptime(), uptime_desc, sizeof(uptime_desc));
cJSON *root = cJSON_CreateObject();
if (root == NULL) {
return NULL;
}
// 基本信息
cJSON_AddStringToObject(root, "message_type", "device_status");
cJSON_AddStringToObject(root, "mac_address", mac_str);
cJSON_AddStringToObject(root, "ip_address", ip_str);
cJSON_AddStringToObject(root, "chip_model", "ESP32-S3");
// IDF 版本
cJSON_AddStringToObject(root, "idf_version", esp_get_idf_version());
// 运行状态
cJSON_AddNumberToObject(root, "uptime", get_device_uptime());
cJSON_AddStringToObject(root, "uptime_desc", uptime_desc); // 中文描述
cJSON_AddNumberToObject(root, "free_heap", esp_get_free_heap_size());
cJSON_AddStringToObject(root, "status", "online");
cJSON_AddStringToObject(root, "status_desc", "在线"); // 中文描述
cJSON_AddStringToObject(root, "update_time", time_str); // 更新时间
// LED 状态(带中文描述)
led_state_t led1_state = status_led_get_state(1);
led_state_t led2_state = status_led_get_state(2);
cJSON_AddNumberToObject(root, "led1_state", (int)led1_state);
cJSON_AddStringToObject(root, "led1_desc", get_led_state_desc(led1_state)); // LED1: 网络状态
cJSON_AddNumberToObject(root, "led2_state", (int)led2_state);
cJSON_AddStringToObject(root, "led2_desc", get_led_state_desc(led2_state)); // LED2: 通信状态
// LED 功能说明(中文)
cJSON_AddStringToObject(root, "led1_function", "网络状态灯");
cJSON_AddStringToObject(root, "led2_function", "通信状态灯");
// MODBUS 轮询状态
modbus_poll_config_t *modbus_config = modbus_get_current_config();
if (modbus_config != NULL) {
cJSON_AddNumberToObject(root, "modbus_enabled", modbus_config->enabled);
cJSON_AddStringToObject(root, "modbus_enabled_desc", modbus_config->enabled ? "启用" : "禁用");
cJSON_AddNumberToObject(root, "modbus_channel", modbus_config->channel_num);
cJSON_AddStringToObject(root, "modbus_channel_desc", modbus_config->channel_num == 0 ? "通道0 (UART0)" : "通道1 (UART2)");
cJSON_AddNumberToObject(root, "modbus_slave_addr", modbus_config->slave_addr);
cJSON_AddNumberToObject(root, "modbus_interval", modbus_config->poll_interval_ms);
} else {
cJSON_AddNumberToObject(root, "modbus_enabled", 0);
cJSON_AddStringToObject(root, "modbus_enabled_desc", "未配置");
cJSON_AddNumberToObject(root, "modbus_channel", 0);
cJSON_AddStringToObject(root, "modbus_channel_desc", "N/A");
cJSON_AddNumberToObject(root, "modbus_slave_addr", 0);
cJSON_AddNumberToObject(root, "modbus_interval", 0);
}
// 内存使用情况
uint32_t total_heap = esp_get_free_heap_size();
cJSON_AddStringToObject(root, "heap_status", total_heap > 100000 ? "充足" : (total_heap > 50000 ? "一般" : "紧张"));
char *json_str = cJSON_Print(root);
cJSON_Delete(root);
return json_str;
}
/**
* @brief 设备状态上报任务
*/
static void device_status_report_task(void *arg)
{
ESP_LOGI(TAG, "设备状态上报任务已启动");
while (1) {
// 检查是否应该退出
if (device_status_task_handle == NULL) {
break;
}
// 构建设备状态JSON
char *status_json = build_device_status_json();
if (status_json != NULL) {
// 如果网络在线直接发布如果离线存储到本地Flash
if (g_is_online && g_client != NULL) {
// 发布到MQTT
int ret = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
status_json, strlen(status_json), 0, 0);
if (ret >= 0) {
ESP_LOGI(TAG, "设备状态已发布消息ID=%d", ret);
ESP_LOGD(TAG, "状态: %s", status_json);
} else {
ESP_LOGW(TAG, "发布设备状态失败");
}
} else {
// 网络离线存储到本地Flash
ESP_LOGW(TAG, "网络离线,设备状态存储到本地");
esp_err_t ret = mqtt_store_offline(status_json, strlen(status_json),
OFFLINE_DATA_TYPE_DEVICE_STATUS);
if (ret == ESP_OK) {
ESP_LOGI(TAG, "设备状态已成功存储到离线存储");
} else {
ESP_LOGE(TAG, "存储设备状态到离线存储失败");
}
}
free(status_json);
}
// 获取上报间隔
uint32_t interval;
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
interval = g_report_interval_ms;
xSemaphoreGive(report_interval_mutex);
// 等待下一次上报
vTaskDelay(pdMS_TO_TICKS(interval));
}
ESP_LOGI(TAG, "设备状态上报任务已退出");
device_status_task_handle = NULL;
vTaskDelete(NULL);
}
BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms)
{
// 如果网络不在线只保存配置延迟到MQTT连接后启动
if (!g_is_online) {
// 创建互斥量
if (report_interval_mutex == NULL) {
report_interval_mutex = xSemaphoreCreateMutex();
if (report_interval_mutex == NULL) {
ESP_LOGE(TAG, "创建报告间隔互斥锁失败");
return pdFALSE;
}
}
// 保存配置
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
xSemaphoreGive(report_interval_mutex);
// 标记为自动启动
g_device_status_task_auto_start = true;
ESP_LOGI(TAG, "设备状态任务将在MQTT连接后启动 (间隔=%dms)", g_report_interval_ms);
return pdPASS;
}
// 如果任务已存在,先停止
if (device_status_task_handle != NULL) {
mqtt_stop_device_status_task();
vTaskDelay(pdMS_TO_TICKS(100));
}
// 创建互斥量
if (report_interval_mutex == NULL) {
report_interval_mutex = xSemaphoreCreateMutex();
if (report_interval_mutex == NULL) {
ESP_LOGE(TAG, "创建报告间隔互斥锁失败");
return pdFALSE;
}
}
// 更新上报间隔
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
xSemaphoreGive(report_interval_mutex);
// 创建任务
BaseType_t ret = xTaskCreate(device_status_report_task, "dev_status",
8192, NULL, 5, &device_status_task_handle);
if (ret == pdPASS) {
g_device_status_task_started = true;
ESP_LOGI(TAG, "设备状态报告任务已创建 (间隔=%dms)", g_report_interval_ms);
} else {
ESP_LOGE(TAG, "创建设备状态报告任务失败");
}
return ret;
}
void mqtt_stop_device_status_task(void)
{
if (device_status_task_handle != NULL) {
ESP_LOGI(TAG, "正在停止设备状态报告任务...");
TaskHandle_t temp_handle = device_status_task_handle;
device_status_task_handle = NULL;
vTaskDelete(temp_handle);
vTaskDelay(pdMS_TO_TICKS(200));
ESP_LOGI(TAG, "设备状态报告任务已停止");
}
}
void mqtt_update_report_interval(uint32_t report_interval_ms)
{
if (report_interval_mutex != NULL) {
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
xSemaphoreGive(report_interval_mutex);
ESP_LOGI(TAG, "Device status report interval updated to %dms", g_report_interval_ms);
}
}
/**
* @brief MQTT事件处理函数
*
* 处理MQTT客户端的各种事件包括连接、订阅、发布、数据接收和错误处理。
*
* @param handler_args 事件处理器参数
* @param base 事件基础类型
* @param event_id 事件ID
* @param event_data 指向MQTT事件数据的指针
*/
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32, base, event_id);
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
int msg_id;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED - MQTT已连接");
// 订阅并取消订阅测试主题
msg_id = esp_mqtt_client_subscribe(client, CONFIG_MQTT_SUB_TOPIC, 1);
ESP_LOGI(TAG, "订阅发送成功, 消息ID=%d", msg_id);
status_led_blink_mode(2, 2); // LED2 心跳MQTT连接正常
// 网络在线,启动离线数据补传任务
g_is_online = true;
ESP_LOGI(TAG, "网络在线,开始离线数据上传");
// 启动设备状态上报任务(如果配置了自动启动)
if (g_device_status_task_auto_start && !g_device_status_task_started) {
if (mqtt_start_device_status_task(g_report_interval_ms) == pdPASS) {
g_device_status_task_started = true;
g_device_status_task_auto_start = false;
ESP_LOGI(TAG, "MQTT连接时自动启动设备状态任务");
}
}
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED - MQTT已断开连接");
status_led_blink_mode(2, 1); // LED2 快闪MQTT断开正在重连
// 网络离线,停止离线数据补传任务
g_is_online = false;
ESP_LOGI(TAG, "网络离线,停止离线数据上传");
// 停止设备状态上报任务(避免存储新数据)
if (g_device_status_task_started) {
mqtt_stop_device_status_task();
g_device_status_task_started = false;
g_device_status_task_auto_start = true; // 重连后自动启动
ESP_LOGI(TAG, "设备状态任务已停止,将在重连时自动重启");
}
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d, return code=0x%02x ", event->msg_id, (uint8_t)*event->data);
// 立即上报一次设备状态
char *status_json = build_device_status_json();
if (status_json != NULL) {
msg_id = esp_mqtt_client_publish(client, CONFIG_MQTT_PUB_TOPIC,
status_json, strlen(status_json), 0, 0);
if (msg_id >= 0) {
ESP_LOGI(TAG, "Device status published immediately, msg_id=%d", msg_id);
}
free(status_json);
}
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, "DATA=%.*s", event->data_len, event->data);
// 解析MQTT控制指令JSON格式
if (event->data_len > 0 && event->data != NULL) {
// 创建临时缓冲区存储JSON数据
char *json_str = malloc(event->data_len + 1);
if (json_str != NULL) {
memcpy(json_str, event->data, event->data_len);
json_str[event->data_len] = '\0';
// 解析JSON
cJSON *root = cJSON_Parse(json_str);
if (root != NULL) {
// 检查是否是MODBUS轮询控制指令
cJSON *cmd = cJSON_GetObjectItem(root, "command");
if (cmd != NULL && cJSON_IsString(cmd)) {
if (strcmp(cmd->valuestring, "modbus_poll") == 0) {
// 解析轮询配置
cJSON *channel = cJSON_GetObjectItem(root, "channel");
cJSON *slave_addr = cJSON_GetObjectItem(root, "slave_addr");
cJSON *start_addr = cJSON_GetObjectItem(root, "start_addr");
cJSON *reg_count = cJSON_GetObjectItem(root, "reg_count");
cJSON *interval = cJSON_GetObjectItem(root, "interval");
cJSON *enabled = cJSON_GetObjectItem(root, "enabled");
// 验证必要字段
if (channel != NULL && cJSON_IsNumber(channel) &&
slave_addr != NULL && cJSON_IsNumber(slave_addr) &&
start_addr != NULL && cJSON_IsNumber(start_addr) &&
reg_count != NULL && cJSON_IsNumber(reg_count) &&
interval != NULL && cJSON_IsNumber(interval)) {
// 构建轮询配置
modbus_poll_config_t poll_config = {
.channel_num = channel->valueint,
.slave_addr = (uint8_t)slave_addr->valueint,
.start_addr = (uint16_t)start_addr->valueint,
.reg_count = (uint16_t)reg_count->valueint,
.poll_interval_ms = (uint32_t)interval->valueint,
.enabled = (enabled != NULL && cJSON_IsBool(enabled)) ? cJSON_IsTrue(enabled) : true
};
// 更新轮询配置
if (modbus_update_poll_config(&poll_config)) {
ESP_LOGI(TAG, "通过MQTT更新MODBUS轮询配置成功");
} else {
ESP_LOGE(TAG, "通过MQTT更新MODBUS轮询配置失败");
}
} else {
ESP_LOGE(TAG, "Missing required fields in MODBUS poll command");
}
}
}
cJSON_Delete(root);
} else {
ESP_LOGE(TAG, "Failed to parse JSON: %s", json_str);
}
free(json_str);
}
}
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
// 错误类型处理分支
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
ESP_LOGI(TAG, "Last error code reported from esp-tls: 0x%x", event->error_handle->esp_tls_last_esp_err);
ESP_LOGI(TAG, "Last tls stack error number: 0x%x", event->error_handle->esp_tls_stack_err);
ESP_LOGI(TAG, "Last captured errno : %d (%s)", event->error_handle->esp_transport_sock_errno,
strerror(event->error_handle->esp_transport_sock_errno));
} else if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
ESP_LOGI(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code);
} else {
ESP_LOGW(TAG, "Unknown error type: 0x%x", event->error_handle->error_type);
}
status_led_blink_mode(2, 0); // LED2 慢闪MQTT错误
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
/**
* @brief 启动MQTT客户端应用程序
*
* 初始化MQTT客户端配置注册事件处理程序并启动MQTT连接
*
* @param 无参数
* @return 无返回值
*/
void mqtt_app_start(void)
{
// 配置MQTT客户端结构体设置代理服务器地址和证书验证
const esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_BROKER_URI,
// .broker.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start,
.credentials.client_id = CONFIG_MQTT_CLIENT_ID,
.credentials.username = CONFIG_MQTT_USERNAME,
.credentials.authentication.password = CONFIG_MQTT_PASSWORD,
};
ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size());
g_client = esp_mqtt_client_init(&mqtt_cfg);
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */
esp_mqtt_client_register_event(g_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
esp_mqtt_client_start(g_client);
}
/**
* @brief 发布MQTT消息
*
* 提供一个外部接口来发布MQTT消息
*
* @param topic 发布的主题
* @param data 要发布的数据
* @param len 数据长度
* @param qos QoS级别 (0, 1, 或 2)
* @param retain 是否保留消息
* @return 消息ID如果失败则返回负数
*/
int mqtt_publish_message(const char* topic, const char* data, int len, int qos, int retain)
{
if (g_client == NULL) {
ESP_LOGE(TAG, "MQTT client not initialized");
return -1;
}
// 如果网络离线存储数据到本地Flash
if (!g_is_online) {
ESP_LOGW(TAG, "网络离线,数据存储到本地 (主题: %s, 长度: %d)", topic, len);
// 根据主题判断数据类型
// 设备状态数据主题通常包含 "status" 或特定的设备状态主题
// 传感器数据使用默认的发布主题,不包含特定状态关键字
offline_data_type_t data_type = OFFLINE_DATA_TYPE_MODBUS;
// 只有当主题明确包含"status"关键字时才认为是设备状态数据
// 避免将包含"device"的传感器数据主题误判为设备状态
if (strstr(topic, "status") != NULL) {
data_type = OFFLINE_DATA_TYPE_DEVICE_STATUS;
}
// 存储离线数据
esp_err_t ret = mqtt_store_offline(data, len, data_type);
if (ret == ESP_OK) {
ESP_LOGI(TAG, "Data stored to offline storage successfully");
return 0; // 返回0表示已存储模拟成功
} else {
ESP_LOGE(TAG, "Failed to store data to offline storage");
return -1;
}
}
// 网络在线,正常发布
int msg_id = esp_mqtt_client_publish(g_client, topic, data, len, qos, retain);
if (msg_id < 0) {
ESP_LOGE(TAG, "Failed to publish message to topic: %s", topic);
return -1;
}
return msg_id;
}
// ============================
// 离线数据补传任务
// ============================
/**
* @brief 检查并发布离线数据(网络在线时)
*
* @return ESP_OK 成功处理(包括没有数据的情况)
* ESP_FAIL 处理失败
*/
static esp_err_t publish_offline_data(void)
{
if (!offline_storage_has_data()) {
return ESP_OK;
}
// 使用静态缓冲区,避免栈溢出
offline_data_type_t data_type;
// 读取最旧的离线数据
esp_err_t ret = offline_storage_read_oldest(offline_data_buffer, sizeof(offline_data_buffer), &data_type);
if (ret != ESP_OK) {
if (ret == ESP_ERR_NOT_FOUND) {
return ESP_OK; // 没有数据,不是错误
}
ESP_LOGE(TAG, "Failed to read offline data: %s", esp_err_to_name(ret));
// 读取失败,删除可能损坏的数据记录,避免死循环
ESP_LOGW(TAG, "Deleting potentially corrupted data record");
offline_storage_delete_oldest();
return ret;
}
// 优先上传传感器数据,设备状态数据丢弃
if (data_type == OFFLINE_DATA_TYPE_DEVICE_STATUS) {
ESP_LOGW(TAG, "Discarding device status offline data (priority: sensor data only)");
offline_storage_delete_oldest();
return ESP_OK; // 返回成功,继续处理下一条
}
ESP_LOGI(TAG, "Publishing offline data (type=%d, size=%zu)", data_type, strlen(offline_data_buffer));
// 为补发数据添加标识is_retrospective = true
// 解析JSON添加字段后重新序列化
cJSON *root = cJSON_Parse(offline_data_buffer);
if (root != NULL) {
// 添加补发标识
cJSON_AddBoolToObject(root, "is_retrospective", true);
// 重新序列化为字符串
char *json_with_flag = cJSON_PrintUnformatted(root);
if (json_with_flag != NULL) {
ESP_LOGI(TAG, "Publishing offline data with retrospective flag: %s", json_with_flag);
// 使用新字符串发布
int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
json_with_flag, strlen(json_with_flag), 0, 0);
free(json_with_flag);
cJSON_Delete(root);
if (msg_id >= 0) {
ESP_LOGI(TAG, "Offline data with retrospective flag published successfully, msg_id=%d", msg_id);
// 发布成功,删除已上传的数据
ret = offline_storage_delete_oldest();
if (ret != ESP_OK) {
ESP_LOGW(TAG, "Failed to delete published offline data");
}
return ESP_OK;
} else {
ESP_LOGE(TAG, "Failed to publish offline data with retrospective flag");
}
} else {
ESP_LOGE(TAG, "Failed to re-serialize JSON with retrospective flag");
cJSON_Delete(root);
}
} else {
ESP_LOGW(TAG, "Failed to parse JSON for adding retrospective flag, publishing raw data: %s", offline_data_buffer);
// 如果JSON解析失败直接发布原始数据不添加标识
int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
offline_data_buffer, strlen(offline_data_buffer), 0, 0);
if (msg_id >= 0) {
ESP_LOGI(TAG, "Raw offline data published successfully, msg_id=%d", msg_id);
// 发布成功,删除已上传的数据
ret = offline_storage_delete_oldest();
if (ret != ESP_OK) {
ESP_LOGW(TAG, "Failed to delete published offline data");
}
return ESP_OK;
} else {
ESP_LOGE(TAG, "Failed to publish raw offline data");
}
}
// 发布失败也删除数据,避免重复尝试
ESP_LOGW(TAG, "Deleting failed data to avoid retry loop");
offline_storage_delete_oldest();
return ESP_FAIL;
}
/**
* @brief 存储离线数据(网络离线时)
*
* @param data 数据内容
* @param length 数据长度
* @param data_type 数据类型
* @return ESP_OK 成功
* ESP_FAIL 失败
*/
esp_err_t mqtt_store_offline(const char *data, size_t length, offline_data_type_t data_type)
{
if (g_is_online) {
ESP_LOGW(TAG, "Network is online, storing data not needed");
return ESP_OK; // 网络在线,不需要存储
}
ESP_LOGI(TAG, "Storing offline data (type=%d, size=%zu)", data_type, length);
esp_err_t ret = offline_storage_store(data, length, data_type);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Failed to store offline data: %s", esp_err_to_name(ret));
return ESP_FAIL;
}
// 获取存储使用情况
size_t used = 0, total = 0;
if (offline_storage_get_usage(&used, &total) == ESP_OK) {
ESP_LOGI(TAG, "Storage usage: %zu / %zu bytes (%.1f%%)",
used, total, (used * 100.0) / total);
}
return ESP_OK;
}
/**
* @brief 离线数据补传任务
*
* 当网络在线时,持续检查并上传离线存储的数据
*/
static void offline_upload_task(void *pvParameters)
{
ESP_LOGI(TAG, "Offline data upload task started");
while (true) {
if (g_is_online && g_client != NULL) {
// 检查并上传离线数据
if (offline_storage_has_data()) {
ESP_LOGI(TAG, "Found %u offline data files, uploading...",
offline_storage_get_count());
// 尝试发布数据,每次处理一条
publish_offline_data();
}
}
// 每100毫秒检查一次提高上传速率
vTaskDelay(pdMS_TO_TICKS(100));
}
}
/**
* @brief 启动离线数据补传任务
*
* @return pdTRUE 成功, pdFALSE 失败
*/
BaseType_t mqtt_start_offline_upload_task(void)
{
if (offline_upload_task_handle != NULL) {
ESP_LOGW(TAG, "Offline upload task already running");
return pdTRUE;
}
BaseType_t ret = xTaskCreate(offline_upload_task, "offline_upload",
8192, NULL, 4, &offline_upload_task_handle);
if (ret == pdPASS) {
ESP_LOGI(TAG, "Offline data upload task started");
} else {
ESP_LOGE(TAG, "Failed to create offline data upload task");
}
return ret;
}
/**
* @brief 停止离线数据补传任务
*/
void mqtt_stop_offline_upload_task(void)
{
if (offline_upload_task_handle != NULL) {
ESP_LOGI(TAG, "Stopping offline data upload task...");
TaskHandle_t temp_handle = offline_upload_task_handle;
offline_upload_task_handle = NULL;
vTaskDelete(temp_handle);
vTaskDelay(pdMS_TO_TICKS(200));
ESP_LOGI(TAG, "Offline data upload task stopped");
}
}