788 lines
29 KiB
C
788 lines
29 KiB
C
#include <stdio.h>
|
||
#include "MQTT_ESP.h"
|
||
#include "STATUS_LED.h"
|
||
#include "MODBUS_ESP.h"
|
||
#include "SNTP_ESP.h"
|
||
#include "OFFLINE_STORAGE.h"
|
||
#include "cJSON.h"
|
||
#include "esp_wifi.h"
|
||
#include "esp_mac.h"
|
||
#include "esp_system.h"
|
||
#include "esp_netif.h"
|
||
#include <time.h>
|
||
|
||
static const char *TAG = "mqtt_esp";
|
||
|
||
|
||
// 全局MQTT客户端句柄
|
||
static esp_mqtt_client_handle_t g_client = NULL;
|
||
|
||
// ============================
|
||
// 设备状态上报任务相关
|
||
// ============================
|
||
|
||
static TaskHandle_t device_status_task_handle = NULL;
|
||
static uint32_t g_report_interval_ms = 10000; // 默认10秒上报一次
|
||
static SemaphoreHandle_t report_interval_mutex = NULL;
|
||
static bool g_device_status_task_started = false; // 设备状态任务是否已启动
|
||
static bool g_device_status_task_auto_start = false; // 是否在MQTT连接后自动启动
|
||
|
||
// ============================
|
||
// 离线数据补传任务相关
|
||
// ============================
|
||
|
||
static TaskHandle_t offline_upload_task_handle = NULL;
|
||
static bool g_is_online = false; // 网络在线状态
|
||
static char offline_data_buffer[2048]; // 静态缓冲区,避免栈溢出
|
||
|
||
/**
|
||
* @brief 获取设备MAC地址字符串
|
||
*/
|
||
static void get_device_mac(char *mac_str, size_t max_len)
|
||
{
|
||
uint8_t mac[6];
|
||
esp_read_mac(mac, ESP_MAC_WIFI_STA);
|
||
snprintf(mac_str, max_len, "%02X:%02X:%02X:%02X:%02X:%02X",
|
||
mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
|
||
}
|
||
|
||
/**
|
||
* @brief 获取设备运行时间(秒)
|
||
*/
|
||
static uint32_t get_device_uptime(void)
|
||
{
|
||
return (uint32_t)(xTaskGetTickCount() * portTICK_PERIOD_MS / 1000);
|
||
}
|
||
|
||
/**
|
||
* @brief 获取设备IP地址字符串
|
||
*/
|
||
static void get_device_ip(char *ip_str, size_t max_len)
|
||
{
|
||
esp_netif_t *netif = esp_netif_get_handle_from_ifkey("ETH_DEF");
|
||
if (netif != NULL) {
|
||
esp_netif_ip_info_t ip_info;
|
||
if (esp_netif_get_ip_info(netif, &ip_info) == ESP_OK) {
|
||
snprintf(ip_str, max_len, IPSTR, IP2STR(&ip_info.ip));
|
||
} else {
|
||
strncpy(ip_str, "N/A", max_len);
|
||
}
|
||
} else {
|
||
strncpy(ip_str, "N/A", max_len);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief 获取当前时间字符串
|
||
*/
|
||
static void get_current_time(char *time_str, size_t max_len)
|
||
{
|
||
// 使用SNTP组件获取格式化时间
|
||
if (sntp_esp_get_formatted_time(time_str, max_len, "%Y-%m-%d %H:%M:%S") != ESP_OK) {
|
||
// 如果获取失败,使用本地时间
|
||
time_t now;
|
||
time(&now);
|
||
struct tm timeinfo;
|
||
localtime_r(&now, &timeinfo);
|
||
strftime(time_str, max_len, "%Y-%m-%d %H:%M:%S", &timeinfo);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief 获取LED状态描述(中文)
|
||
*/
|
||
static const char* get_led_state_desc(led_state_t state)
|
||
{
|
||
switch (state) {
|
||
case LED_OFF: return "关闭";
|
||
case LED_ON: return "常亮";
|
||
case LED_BLINK_SLOW: return "慢闪";
|
||
case LED_BLINK_FAST: return "快闪";
|
||
case LED_HEARTBEAT: return "心跳";
|
||
default: return "未知";
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief 获取运行时间描述(中文)
|
||
*/
|
||
static void get_uptime_desc(uint32_t uptime_sec, char *desc, size_t max_len)
|
||
{
|
||
uint32_t days = uptime_sec / 86400;
|
||
uint32_t hours = (uptime_sec % 86400) / 3600;
|
||
uint32_t minutes = (uptime_sec % 3600) / 60;
|
||
uint32_t seconds = uptime_sec % 60;
|
||
|
||
if (days > 0) {
|
||
snprintf(desc, max_len, "%lu天%lu小时%lu分%lu秒", days, hours, minutes, seconds);
|
||
} else if (hours > 0) {
|
||
snprintf(desc, max_len, "%lu小时%lu分%lu秒", hours, minutes, seconds);
|
||
} else if (minutes > 0) {
|
||
snprintf(desc, max_len, "%lu分%lu秒", minutes, seconds);
|
||
} else {
|
||
snprintf(desc, max_len, "%lu秒", seconds);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief 构建设备状态JSON
|
||
*/
|
||
static char* build_device_status_json(void)
|
||
{
|
||
char mac_str[18];
|
||
char ip_str[16];
|
||
char time_str[32];
|
||
char uptime_desc[64];
|
||
|
||
get_device_mac(mac_str, sizeof(mac_str));
|
||
get_device_ip(ip_str, sizeof(ip_str));
|
||
get_current_time(time_str, sizeof(time_str));
|
||
get_uptime_desc(get_device_uptime(), uptime_desc, sizeof(uptime_desc));
|
||
|
||
cJSON *root = cJSON_CreateObject();
|
||
if (root == NULL) {
|
||
return NULL;
|
||
}
|
||
|
||
// 基本信息
|
||
cJSON_AddStringToObject(root, "message_type", "device_status");
|
||
cJSON_AddStringToObject(root, "mac_address", mac_str);
|
||
cJSON_AddStringToObject(root, "ip_address", ip_str);
|
||
cJSON_AddStringToObject(root, "chip_model", "ESP32-S3");
|
||
|
||
// IDF 版本
|
||
cJSON_AddStringToObject(root, "idf_version", esp_get_idf_version());
|
||
|
||
// 运行状态
|
||
cJSON_AddNumberToObject(root, "uptime", get_device_uptime());
|
||
cJSON_AddStringToObject(root, "uptime_desc", uptime_desc); // 中文描述
|
||
cJSON_AddNumberToObject(root, "free_heap", esp_get_free_heap_size());
|
||
cJSON_AddStringToObject(root, "status", "online");
|
||
cJSON_AddStringToObject(root, "status_desc", "在线"); // 中文描述
|
||
cJSON_AddStringToObject(root, "update_time", time_str); // 更新时间
|
||
|
||
// LED 状态(带中文描述)
|
||
led_state_t led1_state = status_led_get_state(1);
|
||
led_state_t led2_state = status_led_get_state(2);
|
||
cJSON_AddNumberToObject(root, "led1_state", (int)led1_state);
|
||
cJSON_AddStringToObject(root, "led1_desc", get_led_state_desc(led1_state)); // LED1: 网络状态
|
||
cJSON_AddNumberToObject(root, "led2_state", (int)led2_state);
|
||
cJSON_AddStringToObject(root, "led2_desc", get_led_state_desc(led2_state)); // LED2: 通信状态
|
||
|
||
// LED 功能说明(中文)
|
||
cJSON_AddStringToObject(root, "led1_function", "网络状态灯");
|
||
cJSON_AddStringToObject(root, "led2_function", "通信状态灯");
|
||
|
||
// MODBUS 轮询状态
|
||
modbus_poll_config_t *modbus_config = modbus_get_current_config();
|
||
if (modbus_config != NULL) {
|
||
cJSON_AddNumberToObject(root, "modbus_enabled", modbus_config->enabled);
|
||
cJSON_AddStringToObject(root, "modbus_enabled_desc", modbus_config->enabled ? "启用" : "禁用");
|
||
cJSON_AddNumberToObject(root, "modbus_channel", modbus_config->channel_num);
|
||
cJSON_AddStringToObject(root, "modbus_channel_desc", modbus_config->channel_num == 0 ? "通道0 (UART0)" : "通道1 (UART2)");
|
||
cJSON_AddNumberToObject(root, "modbus_slave_addr", modbus_config->slave_addr);
|
||
cJSON_AddNumberToObject(root, "modbus_interval", modbus_config->poll_interval_ms);
|
||
} else {
|
||
cJSON_AddNumberToObject(root, "modbus_enabled", 0);
|
||
cJSON_AddStringToObject(root, "modbus_enabled_desc", "未配置");
|
||
cJSON_AddNumberToObject(root, "modbus_channel", 0);
|
||
cJSON_AddStringToObject(root, "modbus_channel_desc", "N/A");
|
||
cJSON_AddNumberToObject(root, "modbus_slave_addr", 0);
|
||
cJSON_AddNumberToObject(root, "modbus_interval", 0);
|
||
}
|
||
|
||
// 内存使用情况
|
||
uint32_t total_heap = esp_get_free_heap_size();
|
||
cJSON_AddStringToObject(root, "heap_status", total_heap > 100000 ? "充足" : (total_heap > 50000 ? "一般" : "紧张"));
|
||
|
||
char *json_str = cJSON_Print(root);
|
||
cJSON_Delete(root);
|
||
return json_str;
|
||
}
|
||
|
||
/**
|
||
* @brief 设备状态上报任务
|
||
*/
|
||
static void device_status_report_task(void *arg)
|
||
{
|
||
ESP_LOGI(TAG, "设备状态上报任务已启动");
|
||
|
||
while (1) {
|
||
// 检查是否应该退出
|
||
if (device_status_task_handle == NULL) {
|
||
break;
|
||
}
|
||
|
||
// 构建设备状态JSON
|
||
char *status_json = build_device_status_json();
|
||
if (status_json != NULL) {
|
||
// 如果网络在线,直接发布;如果离线,存储到本地Flash
|
||
if (g_is_online && g_client != NULL) {
|
||
// 发布到MQTT
|
||
int ret = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
|
||
status_json, strlen(status_json), 0, 0);
|
||
if (ret >= 0) {
|
||
ESP_LOGI(TAG, "设备状态已发布,消息ID=%d", ret);
|
||
ESP_LOGD(TAG, "状态: %s", status_json);
|
||
} else {
|
||
ESP_LOGW(TAG, "发布设备状态失败");
|
||
}
|
||
} else {
|
||
// 网络离线,存储到本地Flash
|
||
ESP_LOGW(TAG, "网络离线,设备状态存储到本地");
|
||
esp_err_t ret = mqtt_store_offline(status_json, strlen(status_json),
|
||
OFFLINE_DATA_TYPE_DEVICE_STATUS);
|
||
if (ret == ESP_OK) {
|
||
ESP_LOGI(TAG, "设备状态已成功存储到离线存储");
|
||
} else {
|
||
ESP_LOGE(TAG, "存储设备状态到离线存储失败");
|
||
}
|
||
}
|
||
free(status_json);
|
||
}
|
||
|
||
// 获取上报间隔
|
||
uint32_t interval;
|
||
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
|
||
interval = g_report_interval_ms;
|
||
xSemaphoreGive(report_interval_mutex);
|
||
|
||
// 等待下一次上报
|
||
vTaskDelay(pdMS_TO_TICKS(interval));
|
||
}
|
||
|
||
ESP_LOGI(TAG, "设备状态上报任务已退出");
|
||
device_status_task_handle = NULL;
|
||
vTaskDelete(NULL);
|
||
}
|
||
|
||
BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms)
|
||
{
|
||
// 如果网络不在线,只保存配置,延迟到MQTT连接后启动
|
||
if (!g_is_online) {
|
||
// 创建互斥量
|
||
if (report_interval_mutex == NULL) {
|
||
report_interval_mutex = xSemaphoreCreateMutex();
|
||
if (report_interval_mutex == NULL) {
|
||
ESP_LOGE(TAG, "创建报告间隔互斥锁失败");
|
||
return pdFALSE;
|
||
}
|
||
}
|
||
|
||
// 保存配置
|
||
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
|
||
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
|
||
xSemaphoreGive(report_interval_mutex);
|
||
|
||
// 标记为自动启动
|
||
g_device_status_task_auto_start = true;
|
||
ESP_LOGI(TAG, "设备状态任务将在MQTT连接后启动 (间隔=%dms)", g_report_interval_ms);
|
||
return pdPASS;
|
||
}
|
||
|
||
// 如果任务已存在,先停止
|
||
if (device_status_task_handle != NULL) {
|
||
mqtt_stop_device_status_task();
|
||
vTaskDelay(pdMS_TO_TICKS(100));
|
||
}
|
||
|
||
// 创建互斥量
|
||
if (report_interval_mutex == NULL) {
|
||
report_interval_mutex = xSemaphoreCreateMutex();
|
||
if (report_interval_mutex == NULL) {
|
||
ESP_LOGE(TAG, "创建报告间隔互斥锁失败");
|
||
return pdFALSE;
|
||
}
|
||
}
|
||
|
||
// 更新上报间隔
|
||
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
|
||
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
|
||
xSemaphoreGive(report_interval_mutex);
|
||
|
||
// 创建任务
|
||
BaseType_t ret = xTaskCreate(device_status_report_task, "dev_status",
|
||
8192, NULL, 5, &device_status_task_handle);
|
||
|
||
if (ret == pdPASS) {
|
||
g_device_status_task_started = true;
|
||
ESP_LOGI(TAG, "设备状态报告任务已创建 (间隔=%dms)", g_report_interval_ms);
|
||
} else {
|
||
ESP_LOGE(TAG, "创建设备状态报告任务失败");
|
||
}
|
||
|
||
return ret;
|
||
}
|
||
|
||
void mqtt_stop_device_status_task(void)
|
||
{
|
||
if (device_status_task_handle != NULL) {
|
||
ESP_LOGI(TAG, "正在停止设备状态报告任务...");
|
||
|
||
TaskHandle_t temp_handle = device_status_task_handle;
|
||
device_status_task_handle = NULL;
|
||
vTaskDelete(temp_handle);
|
||
|
||
vTaskDelay(pdMS_TO_TICKS(200));
|
||
ESP_LOGI(TAG, "设备状态报告任务已停止");
|
||
}
|
||
}
|
||
|
||
void mqtt_update_report_interval(uint32_t report_interval_ms)
|
||
{
|
||
if (report_interval_mutex != NULL) {
|
||
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
|
||
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
|
||
xSemaphoreGive(report_interval_mutex);
|
||
ESP_LOGI(TAG, "Device status report interval updated to %dms", g_report_interval_ms);
|
||
}
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief MQTT事件处理函数
|
||
*
|
||
* 处理MQTT客户端的各种事件,包括连接、订阅、发布、数据接收和错误处理。
|
||
*
|
||
* @param handler_args 事件处理器参数
|
||
* @param base 事件基础类型
|
||
* @param event_id 事件ID
|
||
* @param event_data 指向MQTT事件数据的指针
|
||
*/
|
||
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
|
||
{
|
||
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32, base, event_id);
|
||
esp_mqtt_event_handle_t event = event_data;
|
||
esp_mqtt_client_handle_t client = event->client;
|
||
int msg_id;
|
||
switch ((esp_mqtt_event_id_t)event_id) {
|
||
case MQTT_EVENT_CONNECTED:
|
||
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED - MQTT已连接");
|
||
// 订阅并取消订阅测试主题
|
||
msg_id = esp_mqtt_client_subscribe(client, CONFIG_MQTT_SUB_TOPIC, 1);
|
||
ESP_LOGI(TAG, "订阅发送成功, 消息ID=%d", msg_id);
|
||
status_led_blink_mode(2, 2); // LED2 心跳:MQTT连接正常
|
||
|
||
// 网络在线,启动离线数据补传任务
|
||
g_is_online = true;
|
||
ESP_LOGI(TAG, "网络在线,开始离线数据上传");
|
||
|
||
// 启动设备状态上报任务(如果配置了自动启动)
|
||
if (g_device_status_task_auto_start && !g_device_status_task_started) {
|
||
if (mqtt_start_device_status_task(g_report_interval_ms) == pdPASS) {
|
||
g_device_status_task_started = true;
|
||
g_device_status_task_auto_start = false;
|
||
ESP_LOGI(TAG, "MQTT连接时自动启动设备状态任务");
|
||
}
|
||
}
|
||
break;
|
||
case MQTT_EVENT_DISCONNECTED:
|
||
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED - MQTT已断开连接");
|
||
status_led_blink_mode(2, 1); // LED2 快闪:MQTT断开,正在重连
|
||
|
||
// 网络离线,停止离线数据补传任务
|
||
g_is_online = false;
|
||
ESP_LOGI(TAG, "网络离线,停止离线数据上传");
|
||
|
||
// 停止设备状态上报任务(避免存储新数据)
|
||
if (g_device_status_task_started) {
|
||
mqtt_stop_device_status_task();
|
||
g_device_status_task_started = false;
|
||
g_device_status_task_auto_start = true; // 重连后自动启动
|
||
ESP_LOGI(TAG, "设备状态任务已停止,将在重连时自动重启");
|
||
}
|
||
break;
|
||
|
||
case MQTT_EVENT_SUBSCRIBED:
|
||
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d, return code=0x%02x ", event->msg_id, (uint8_t)*event->data);
|
||
// 立即上报一次设备状态
|
||
char *status_json = build_device_status_json();
|
||
if (status_json != NULL) {
|
||
msg_id = esp_mqtt_client_publish(client, CONFIG_MQTT_PUB_TOPIC,
|
||
status_json, strlen(status_json), 0, 0);
|
||
if (msg_id >= 0) {
|
||
ESP_LOGI(TAG, "Device status published immediately, msg_id=%d", msg_id);
|
||
}
|
||
free(status_json);
|
||
}
|
||
break;
|
||
case MQTT_EVENT_UNSUBSCRIBED:
|
||
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
|
||
break;
|
||
case MQTT_EVENT_PUBLISHED:
|
||
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
|
||
break;
|
||
case MQTT_EVENT_DATA:
|
||
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
|
||
ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
|
||
ESP_LOGI(TAG, "DATA=%.*s", event->data_len, event->data);
|
||
|
||
// 解析MQTT控制指令(JSON格式)
|
||
if (event->data_len > 0 && event->data != NULL) {
|
||
// 创建临时缓冲区存储JSON数据
|
||
char *json_str = malloc(event->data_len + 1);
|
||
if (json_str != NULL) {
|
||
memcpy(json_str, event->data, event->data_len);
|
||
json_str[event->data_len] = '\0';
|
||
|
||
// 解析JSON
|
||
cJSON *root = cJSON_Parse(json_str);
|
||
if (root != NULL) {
|
||
// 检查是否是MODBUS轮询控制指令
|
||
cJSON *cmd = cJSON_GetObjectItem(root, "command");
|
||
if (cmd != NULL && cJSON_IsString(cmd)) {
|
||
if (strcmp(cmd->valuestring, "modbus_poll") == 0) {
|
||
// 解析轮询配置
|
||
cJSON *channel = cJSON_GetObjectItem(root, "channel");
|
||
cJSON *slave_addr = cJSON_GetObjectItem(root, "slave_addr");
|
||
cJSON *start_addr = cJSON_GetObjectItem(root, "start_addr");
|
||
cJSON *reg_count = cJSON_GetObjectItem(root, "reg_count");
|
||
cJSON *interval = cJSON_GetObjectItem(root, "interval");
|
||
cJSON *enabled = cJSON_GetObjectItem(root, "enabled");
|
||
|
||
// 验证必要字段
|
||
if (channel != NULL && cJSON_IsNumber(channel) &&
|
||
slave_addr != NULL && cJSON_IsNumber(slave_addr) &&
|
||
start_addr != NULL && cJSON_IsNumber(start_addr) &&
|
||
reg_count != NULL && cJSON_IsNumber(reg_count) &&
|
||
interval != NULL && cJSON_IsNumber(interval)) {
|
||
|
||
// 构建轮询配置
|
||
modbus_poll_config_t poll_config = {
|
||
.channel_num = channel->valueint,
|
||
.slave_addr = (uint8_t)slave_addr->valueint,
|
||
.start_addr = (uint16_t)start_addr->valueint,
|
||
.reg_count = (uint16_t)reg_count->valueint,
|
||
.poll_interval_ms = (uint32_t)interval->valueint,
|
||
.enabled = (enabled != NULL && cJSON_IsBool(enabled)) ? cJSON_IsTrue(enabled) : true
|
||
};
|
||
|
||
// 更新轮询配置
|
||
if (modbus_update_poll_config(&poll_config)) {
|
||
ESP_LOGI(TAG, "通过MQTT更新MODBUS轮询配置成功");
|
||
} else {
|
||
ESP_LOGE(TAG, "通过MQTT更新MODBUS轮询配置失败");
|
||
}
|
||
} else {
|
||
ESP_LOGE(TAG, "Missing required fields in MODBUS poll command");
|
||
}
|
||
}
|
||
}
|
||
cJSON_Delete(root);
|
||
} else {
|
||
ESP_LOGE(TAG, "Failed to parse JSON: %s", json_str);
|
||
}
|
||
free(json_str);
|
||
}
|
||
}
|
||
|
||
break;
|
||
case MQTT_EVENT_ERROR:
|
||
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
|
||
// 错误类型处理分支
|
||
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
|
||
ESP_LOGI(TAG, "Last error code reported from esp-tls: 0x%x", event->error_handle->esp_tls_last_esp_err);
|
||
ESP_LOGI(TAG, "Last tls stack error number: 0x%x", event->error_handle->esp_tls_stack_err);
|
||
ESP_LOGI(TAG, "Last captured errno : %d (%s)", event->error_handle->esp_transport_sock_errno,
|
||
strerror(event->error_handle->esp_transport_sock_errno));
|
||
} else if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
|
||
ESP_LOGI(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code);
|
||
} else {
|
||
ESP_LOGW(TAG, "Unknown error type: 0x%x", event->error_handle->error_type);
|
||
}
|
||
status_led_blink_mode(2, 0); // LED2 慢闪:MQTT错误
|
||
break;
|
||
default:
|
||
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
|
||
break;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief 启动MQTT客户端应用程序
|
||
*
|
||
* 初始化MQTT客户端配置,注册事件处理程序,并启动MQTT连接
|
||
*
|
||
* @param 无参数
|
||
* @return 无返回值
|
||
*/
|
||
void mqtt_app_start(void)
|
||
{
|
||
// 配置MQTT客户端结构体,设置代理服务器地址和证书验证
|
||
const esp_mqtt_client_config_t mqtt_cfg = {
|
||
.broker.address.uri = CONFIG_BROKER_URI,
|
||
// .broker.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start,
|
||
.credentials.client_id = CONFIG_MQTT_CLIENT_ID,
|
||
.credentials.username = CONFIG_MQTT_USERNAME,
|
||
.credentials.authentication.password = CONFIG_MQTT_PASSWORD,
|
||
};
|
||
|
||
ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size());
|
||
g_client = esp_mqtt_client_init(&mqtt_cfg);
|
||
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */
|
||
esp_mqtt_client_register_event(g_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
|
||
|
||
esp_mqtt_client_start(g_client);
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief 发布MQTT消息
|
||
*
|
||
* 提供一个外部接口来发布MQTT消息
|
||
*
|
||
* @param topic 发布的主题
|
||
* @param data 要发布的数据
|
||
* @param len 数据长度
|
||
* @param qos QoS级别 (0, 1, 或 2)
|
||
* @param retain 是否保留消息
|
||
* @return 消息ID,如果失败则返回负数
|
||
*/
|
||
int mqtt_publish_message(const char* topic, const char* data, int len, int qos, int retain)
|
||
{
|
||
if (g_client == NULL) {
|
||
ESP_LOGE(TAG, "MQTT client not initialized");
|
||
return -1;
|
||
}
|
||
|
||
// 如果网络离线,存储数据到本地Flash
|
||
if (!g_is_online) {
|
||
ESP_LOGW(TAG, "网络离线,数据存储到本地 (主题: %s, 长度: %d)", topic, len);
|
||
|
||
// 根据主题判断数据类型
|
||
// 设备状态数据主题通常包含 "status" 或特定的设备状态主题
|
||
// 传感器数据使用默认的发布主题,不包含特定状态关键字
|
||
offline_data_type_t data_type = OFFLINE_DATA_TYPE_MODBUS;
|
||
// 只有当主题明确包含"status"关键字时才认为是设备状态数据
|
||
// 避免将包含"device"的传感器数据主题误判为设备状态
|
||
if (strstr(topic, "status") != NULL) {
|
||
data_type = OFFLINE_DATA_TYPE_DEVICE_STATUS;
|
||
}
|
||
|
||
// 存储离线数据
|
||
esp_err_t ret = mqtt_store_offline(data, len, data_type);
|
||
if (ret == ESP_OK) {
|
||
ESP_LOGI(TAG, "Data stored to offline storage successfully");
|
||
return 0; // 返回0表示已存储(模拟成功)
|
||
} else {
|
||
ESP_LOGE(TAG, "Failed to store data to offline storage");
|
||
return -1;
|
||
}
|
||
}
|
||
|
||
// 网络在线,正常发布
|
||
int msg_id = esp_mqtt_client_publish(g_client, topic, data, len, qos, retain);
|
||
if (msg_id < 0) {
|
||
ESP_LOGE(TAG, "Failed to publish message to topic: %s", topic);
|
||
return -1;
|
||
}
|
||
|
||
return msg_id;
|
||
}
|
||
|
||
// ============================
|
||
// 离线数据补传任务
|
||
// ============================
|
||
|
||
/**
|
||
* @brief 检查并发布离线数据(网络在线时)
|
||
*
|
||
* @return ESP_OK 成功处理(包括没有数据的情况)
|
||
* ESP_FAIL 处理失败
|
||
*/
|
||
static esp_err_t publish_offline_data(void)
|
||
{
|
||
if (!offline_storage_has_data()) {
|
||
return ESP_OK;
|
||
}
|
||
|
||
// 使用静态缓冲区,避免栈溢出
|
||
offline_data_type_t data_type;
|
||
|
||
// 读取最旧的离线数据
|
||
esp_err_t ret = offline_storage_read_oldest(offline_data_buffer, sizeof(offline_data_buffer), &data_type);
|
||
if (ret != ESP_OK) {
|
||
if (ret == ESP_ERR_NOT_FOUND) {
|
||
return ESP_OK; // 没有数据,不是错误
|
||
}
|
||
ESP_LOGE(TAG, "Failed to read offline data: %s", esp_err_to_name(ret));
|
||
// 读取失败,删除可能损坏的数据记录,避免死循环
|
||
ESP_LOGW(TAG, "Deleting potentially corrupted data record");
|
||
offline_storage_delete_oldest();
|
||
return ret;
|
||
}
|
||
|
||
// 优先上传传感器数据,设备状态数据丢弃
|
||
if (data_type == OFFLINE_DATA_TYPE_DEVICE_STATUS) {
|
||
ESP_LOGW(TAG, "Discarding device status offline data (priority: sensor data only)");
|
||
offline_storage_delete_oldest();
|
||
return ESP_OK; // 返回成功,继续处理下一条
|
||
}
|
||
|
||
ESP_LOGI(TAG, "Publishing offline data (type=%d, size=%zu)", data_type, strlen(offline_data_buffer));
|
||
|
||
// 为补发数据添加标识:is_retrospective = true
|
||
// 解析JSON,添加字段后重新序列化
|
||
cJSON *root = cJSON_Parse(offline_data_buffer);
|
||
if (root != NULL) {
|
||
// 添加补发标识
|
||
cJSON_AddBoolToObject(root, "is_retrospective", true);
|
||
|
||
// 重新序列化为字符串
|
||
char *json_with_flag = cJSON_PrintUnformatted(root);
|
||
if (json_with_flag != NULL) {
|
||
ESP_LOGI(TAG, "Publishing offline data with retrospective flag: %s", json_with_flag);
|
||
|
||
// 使用新字符串发布
|
||
int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
|
||
json_with_flag, strlen(json_with_flag), 0, 0);
|
||
|
||
free(json_with_flag);
|
||
cJSON_Delete(root);
|
||
|
||
if (msg_id >= 0) {
|
||
ESP_LOGI(TAG, "Offline data with retrospective flag published successfully, msg_id=%d", msg_id);
|
||
|
||
// 发布成功,删除已上传的数据
|
||
ret = offline_storage_delete_oldest();
|
||
if (ret != ESP_OK) {
|
||
ESP_LOGW(TAG, "Failed to delete published offline data");
|
||
}
|
||
|
||
return ESP_OK;
|
||
} else {
|
||
ESP_LOGE(TAG, "Failed to publish offline data with retrospective flag");
|
||
}
|
||
} else {
|
||
ESP_LOGE(TAG, "Failed to re-serialize JSON with retrospective flag");
|
||
cJSON_Delete(root);
|
||
}
|
||
} else {
|
||
ESP_LOGW(TAG, "Failed to parse JSON for adding retrospective flag, publishing raw data: %s", offline_data_buffer);
|
||
|
||
// 如果JSON解析失败,直接发布原始数据(不添加标识)
|
||
int msg_id = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
|
||
offline_data_buffer, strlen(offline_data_buffer), 0, 0);
|
||
|
||
if (msg_id >= 0) {
|
||
ESP_LOGI(TAG, "Raw offline data published successfully, msg_id=%d", msg_id);
|
||
|
||
// 发布成功,删除已上传的数据
|
||
ret = offline_storage_delete_oldest();
|
||
if (ret != ESP_OK) {
|
||
ESP_LOGW(TAG, "Failed to delete published offline data");
|
||
}
|
||
|
||
return ESP_OK;
|
||
} else {
|
||
ESP_LOGE(TAG, "Failed to publish raw offline data");
|
||
}
|
||
}
|
||
|
||
// 发布失败也删除数据,避免重复尝试
|
||
ESP_LOGW(TAG, "Deleting failed data to avoid retry loop");
|
||
offline_storage_delete_oldest();
|
||
return ESP_FAIL;
|
||
}
|
||
|
||
/**
|
||
* @brief 存储离线数据(网络离线时)
|
||
*
|
||
* @param data 数据内容
|
||
* @param length 数据长度
|
||
* @param data_type 数据类型
|
||
* @return ESP_OK 成功
|
||
* ESP_FAIL 失败
|
||
*/
|
||
esp_err_t mqtt_store_offline(const char *data, size_t length, offline_data_type_t data_type)
|
||
{
|
||
if (g_is_online) {
|
||
ESP_LOGW(TAG, "Network is online, storing data not needed");
|
||
return ESP_OK; // 网络在线,不需要存储
|
||
}
|
||
|
||
ESP_LOGI(TAG, "Storing offline data (type=%d, size=%zu)", data_type, length);
|
||
|
||
esp_err_t ret = offline_storage_store(data, length, data_type);
|
||
if (ret != ESP_OK) {
|
||
ESP_LOGE(TAG, "Failed to store offline data: %s", esp_err_to_name(ret));
|
||
return ESP_FAIL;
|
||
}
|
||
|
||
// 获取存储使用情况
|
||
size_t used = 0, total = 0;
|
||
if (offline_storage_get_usage(&used, &total) == ESP_OK) {
|
||
ESP_LOGI(TAG, "Storage usage: %zu / %zu bytes (%.1f%%)",
|
||
used, total, (used * 100.0) / total);
|
||
}
|
||
|
||
return ESP_OK;
|
||
}
|
||
|
||
/**
|
||
* @brief 离线数据补传任务
|
||
*
|
||
* 当网络在线时,持续检查并上传离线存储的数据
|
||
*/
|
||
static void offline_upload_task(void *pvParameters)
|
||
{
|
||
ESP_LOGI(TAG, "Offline data upload task started");
|
||
|
||
while (true) {
|
||
if (g_is_online && g_client != NULL) {
|
||
// 检查并上传离线数据
|
||
if (offline_storage_has_data()) {
|
||
ESP_LOGI(TAG, "Found %u offline data files, uploading...",
|
||
offline_storage_get_count());
|
||
|
||
// 尝试发布数据,每次处理一条
|
||
publish_offline_data();
|
||
}
|
||
}
|
||
|
||
// 每100毫秒检查一次(提高上传速率)
|
||
vTaskDelay(pdMS_TO_TICKS(100));
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief 启动离线数据补传任务
|
||
*
|
||
* @return pdTRUE 成功, pdFALSE 失败
|
||
*/
|
||
BaseType_t mqtt_start_offline_upload_task(void)
|
||
{
|
||
if (offline_upload_task_handle != NULL) {
|
||
ESP_LOGW(TAG, "Offline upload task already running");
|
||
return pdTRUE;
|
||
}
|
||
|
||
BaseType_t ret = xTaskCreate(offline_upload_task, "offline_upload",
|
||
8192, NULL, 4, &offline_upload_task_handle);
|
||
|
||
if (ret == pdPASS) {
|
||
ESP_LOGI(TAG, "Offline data upload task started");
|
||
} else {
|
||
ESP_LOGE(TAG, "Failed to create offline data upload task");
|
||
}
|
||
|
||
return ret;
|
||
}
|
||
|
||
/**
|
||
* @brief 停止离线数据补传任务
|
||
*/
|
||
void mqtt_stop_offline_upload_task(void)
|
||
{
|
||
if (offline_upload_task_handle != NULL) {
|
||
ESP_LOGI(TAG, "Stopping offline data upload task...");
|
||
|
||
TaskHandle_t temp_handle = offline_upload_task_handle;
|
||
offline_upload_task_handle = NULL;
|
||
vTaskDelete(temp_handle);
|
||
|
||
vTaskDelay(pdMS_TO_TICKS(200));
|
||
ESP_LOGI(TAG, "Offline data upload task stopped");
|
||
}
|
||
}
|