#include #include "MQTT_ESP.h" #include "STATUS_LED.h" #include "MODBUS_ESP.h" #include "SNTP_ESP.h" #include "cJSON.h" #include "esp_wifi.h" #include "esp_mac.h" #include "esp_system.h" #include "esp_netif.h" #include static const char *TAG = "mqtt_esp"; // 全局MQTT客户端句柄 static esp_mqtt_client_handle_t g_client = NULL; // ============================ // 设备状态上报任务相关 // ============================ static TaskHandle_t device_status_task_handle = NULL; static uint32_t g_report_interval_ms = 10000; // 默认10秒上报一次 static SemaphoreHandle_t report_interval_mutex = NULL; /** * @brief 获取设备MAC地址字符串 */ static void get_device_mac(char *mac_str, size_t max_len) { uint8_t mac[6]; esp_read_mac(mac, ESP_MAC_WIFI_STA); snprintf(mac_str, max_len, "%02X:%02X:%02X:%02X:%02X:%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); } /** * @brief 获取设备运行时间(秒) */ static uint32_t get_device_uptime(void) { return (uint32_t)(xTaskGetTickCount() * portTICK_PERIOD_MS / 1000); } /** * @brief 获取设备IP地址字符串 */ static void get_device_ip(char *ip_str, size_t max_len) { esp_netif_t *netif = esp_netif_get_handle_from_ifkey("ETH_DEF"); if (netif != NULL) { esp_netif_ip_info_t ip_info; if (esp_netif_get_ip_info(netif, &ip_info) == ESP_OK) { snprintf(ip_str, max_len, IPSTR, IP2STR(&ip_info.ip)); } else { strncpy(ip_str, "N/A", max_len); } } else { strncpy(ip_str, "N/A", max_len); } } /** * @brief 获取当前时间字符串 */ static void get_current_time(char *time_str, size_t max_len) { // 使用SNTP组件获取格式化时间 if (sntp_esp_get_formatted_time(time_str, max_len, "%Y-%m-%d %H:%M:%S") != ESP_OK) { // 如果获取失败,使用本地时间 time_t now; time(&now); struct tm timeinfo; localtime_r(&now, &timeinfo); strftime(time_str, max_len, "%Y-%m-%d %H:%M:%S", &timeinfo); } } /** * @brief 获取LED状态描述(中文) */ static const char* get_led_state_desc(led_state_t state) { switch (state) { case LED_OFF: return "关闭"; case LED_ON: return "常亮"; case LED_BLINK_SLOW: return "慢闪"; case LED_BLINK_FAST: return "快闪"; case LED_HEARTBEAT: return "心跳"; default: return "未知"; } } /** * @brief 获取运行时间描述(中文) */ static void get_uptime_desc(uint32_t uptime_sec, char *desc, size_t max_len) { uint32_t days = uptime_sec / 86400; uint32_t hours = (uptime_sec % 86400) / 3600; uint32_t minutes = (uptime_sec % 3600) / 60; uint32_t seconds = uptime_sec % 60; if (days > 0) { snprintf(desc, max_len, "%lu天%lu小时%lu分%lu秒", days, hours, minutes, seconds); } else if (hours > 0) { snprintf(desc, max_len, "%lu小时%lu分%lu秒", hours, minutes, seconds); } else if (minutes > 0) { snprintf(desc, max_len, "%lu分%lu秒", minutes, seconds); } else { snprintf(desc, max_len, "%lu秒", seconds); } } /** * @brief 构建设备状态JSON */ static char* build_device_status_json(void) { char mac_str[18]; char ip_str[16]; char time_str[32]; char uptime_desc[64]; get_device_mac(mac_str, sizeof(mac_str)); get_device_ip(ip_str, sizeof(ip_str)); get_current_time(time_str, sizeof(time_str)); get_uptime_desc(get_device_uptime(), uptime_desc, sizeof(uptime_desc)); cJSON *root = cJSON_CreateObject(); if (root == NULL) { return NULL; } // 基本信息 cJSON_AddStringToObject(root, "message_type", "device_status"); cJSON_AddStringToObject(root, "mac_address", mac_str); cJSON_AddStringToObject(root, "ip_address", ip_str); cJSON_AddStringToObject(root, "chip_model", "ESP32-S3"); // IDF 版本 cJSON_AddStringToObject(root, "idf_version", esp_get_idf_version()); // 运行状态 cJSON_AddNumberToObject(root, "uptime", get_device_uptime()); cJSON_AddStringToObject(root, "uptime_desc", uptime_desc); // 中文描述 cJSON_AddNumberToObject(root, "free_heap", esp_get_free_heap_size()); cJSON_AddStringToObject(root, "status", "online"); cJSON_AddStringToObject(root, "status_desc", "在线"); // 中文描述 cJSON_AddStringToObject(root, "update_time", time_str); // 更新时间 // LED 状态(带中文描述) led_state_t led1_state = status_led_get_state(1); led_state_t led2_state = status_led_get_state(2); cJSON_AddNumberToObject(root, "led1_state", (int)led1_state); cJSON_AddStringToObject(root, "led1_desc", get_led_state_desc(led1_state)); // LED1: 网络状态 cJSON_AddNumberToObject(root, "led2_state", (int)led2_state); cJSON_AddStringToObject(root, "led2_desc", get_led_state_desc(led2_state)); // LED2: 通信状态 // LED 功能说明(中文) cJSON_AddStringToObject(root, "led1_function", "网络状态灯"); cJSON_AddStringToObject(root, "led2_function", "通信状态灯"); // MODBUS 轮询状态 modbus_poll_config_t *modbus_config = modbus_get_current_config(); if (modbus_config != NULL) { cJSON_AddNumberToObject(root, "modbus_enabled", modbus_config->enabled); cJSON_AddStringToObject(root, "modbus_enabled_desc", modbus_config->enabled ? "启用" : "禁用"); cJSON_AddNumberToObject(root, "modbus_channel", modbus_config->channel_num); cJSON_AddStringToObject(root, "modbus_channel_desc", modbus_config->channel_num == 0 ? "通道0 (UART0)" : "通道1 (UART2)"); cJSON_AddNumberToObject(root, "modbus_slave_addr", modbus_config->slave_addr); cJSON_AddNumberToObject(root, "modbus_interval", modbus_config->poll_interval_ms); } else { cJSON_AddNumberToObject(root, "modbus_enabled", 0); cJSON_AddStringToObject(root, "modbus_enabled_desc", "未配置"); cJSON_AddNumberToObject(root, "modbus_channel", 0); cJSON_AddStringToObject(root, "modbus_channel_desc", "N/A"); cJSON_AddNumberToObject(root, "modbus_slave_addr", 0); cJSON_AddNumberToObject(root, "modbus_interval", 0); } // 内存使用情况 uint32_t total_heap = esp_get_free_heap_size(); cJSON_AddStringToObject(root, "heap_status", total_heap > 100000 ? "充足" : (total_heap > 50000 ? "一般" : "紧张")); char *json_str = cJSON_Print(root); cJSON_Delete(root); return json_str; } /** * @brief 设备状态上报任务 */ static void device_status_report_task(void *arg) { ESP_LOGI(TAG, "Device status report task started"); while (1) { // 检查是否应该退出 if (device_status_task_handle == NULL) { break; } // 检查MQTT是否已连接 if (g_client != NULL) { // 构建设备状态JSON char *status_json = build_device_status_json(); if (status_json != NULL) { // 发布到MQTT int ret = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC, status_json, strlen(status_json), 0, 0); if (ret >= 0) { ESP_LOGI(TAG, "Device status published, msg_id=%d", ret); ESP_LOGD(TAG, "Status: %s", status_json); } else { ESP_LOGW(TAG, "Failed to publish device status"); } free(status_json); } } // 获取上报间隔 uint32_t interval; xSemaphoreTake(report_interval_mutex, portMAX_DELAY); interval = g_report_interval_ms; xSemaphoreGive(report_interval_mutex); // 等待下一次上报 vTaskDelay(pdMS_TO_TICKS(interval)); } ESP_LOGI(TAG, "Device status report task exiting"); device_status_task_handle = NULL; vTaskDelete(NULL); } BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms) { // 如果任务已存在,先停止 if (device_status_task_handle != NULL) { mqtt_stop_device_status_task(); vTaskDelay(pdMS_TO_TICKS(100)); } // 创建互斥量 if (report_interval_mutex == NULL) { report_interval_mutex = xSemaphoreCreateMutex(); if (report_interval_mutex == NULL) { ESP_LOGE(TAG, "Failed to create report interval mutex"); return pdFALSE; } } // 更新上报间隔 xSemaphoreTake(report_interval_mutex, portMAX_DELAY); g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000; xSemaphoreGive(report_interval_mutex); // 创建任务 BaseType_t ret = xTaskCreate(device_status_report_task, "dev_status", 4096, NULL, 5, &device_status_task_handle); if (ret == pdPASS) { ESP_LOGI(TAG, "Device status report task started (interval=%dms)", g_report_interval_ms); } else { ESP_LOGE(TAG, "Failed to create device status report task"); } return ret; } void mqtt_stop_device_status_task(void) { if (device_status_task_handle != NULL) { ESP_LOGI(TAG, "Stopping device status report task..."); TaskHandle_t temp_handle = device_status_task_handle; device_status_task_handle = NULL; vTaskDelete(temp_handle); vTaskDelay(pdMS_TO_TICKS(200)); ESP_LOGI(TAG, "Device status report task stopped"); } } void mqtt_update_report_interval(uint32_t report_interval_ms) { if (report_interval_mutex != NULL) { xSemaphoreTake(report_interval_mutex, portMAX_DELAY); g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000; xSemaphoreGive(report_interval_mutex); ESP_LOGI(TAG, "Device status report interval updated to %dms", g_report_interval_ms); } } /** * @brief MQTT事件处理函数 * * 处理MQTT客户端的各种事件,包括连接、订阅、发布、数据接收和错误处理。 * * @param handler_args 事件处理器参数 * @param base 事件基础类型 * @param event_id 事件ID * @param event_data 指向MQTT事件数据的指针 */ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32, base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; int msg_id; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); // 订阅并取消订阅测试主题 msg_id = esp_mqtt_client_subscribe(client, CONFIG_MQTT_SUB_TOPIC, 1); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); status_led_blink_mode(2, 2); // LED2 心跳:MQTT连接正常 break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); status_led_blink_mode(2, 1); // LED2 快闪:MQTT断开,正在重连 break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d, return code=0x%02x ", event->msg_id, (uint8_t)*event->data); // 立即上报一次设备状态 char *status_json = build_device_status_json(); if (status_json != NULL) { msg_id = esp_mqtt_client_publish(client, CONFIG_MQTT_PUB_TOPIC, status_json, strlen(status_json), 0, 0); if (msg_id >= 0) { ESP_LOGI(TAG, "Device status published immediately, msg_id=%d", msg_id); } free(status_json); } break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic); ESP_LOGI(TAG, "DATA=%.*s", event->data_len, event->data); // 解析MQTT控制指令(JSON格式) if (event->data_len > 0 && event->data != NULL) { // 创建临时缓冲区存储JSON数据 char *json_str = malloc(event->data_len + 1); if (json_str != NULL) { memcpy(json_str, event->data, event->data_len); json_str[event->data_len] = '\0'; // 解析JSON cJSON *root = cJSON_Parse(json_str); if (root != NULL) { // 检查是否是MODBUS轮询控制指令 cJSON *cmd = cJSON_GetObjectItem(root, "command"); if (cmd != NULL && cJSON_IsString(cmd)) { if (strcmp(cmd->valuestring, "modbus_poll") == 0) { // 解析轮询配置 cJSON *channel = cJSON_GetObjectItem(root, "channel"); cJSON *slave_addr = cJSON_GetObjectItem(root, "slave_addr"); cJSON *start_addr = cJSON_GetObjectItem(root, "start_addr"); cJSON *reg_count = cJSON_GetObjectItem(root, "reg_count"); cJSON *interval = cJSON_GetObjectItem(root, "interval"); cJSON *enabled = cJSON_GetObjectItem(root, "enabled"); // 验证必要字段 if (channel != NULL && cJSON_IsNumber(channel) && slave_addr != NULL && cJSON_IsNumber(slave_addr) && start_addr != NULL && cJSON_IsNumber(start_addr) && reg_count != NULL && cJSON_IsNumber(reg_count) && interval != NULL && cJSON_IsNumber(interval)) { // 构建轮询配置 modbus_poll_config_t poll_config = { .channel_num = channel->valueint, .slave_addr = (uint8_t)slave_addr->valueint, .start_addr = (uint16_t)start_addr->valueint, .reg_count = (uint16_t)reg_count->valueint, .poll_interval_ms = (uint32_t)interval->valueint, .enabled = (enabled != NULL && cJSON_IsBool(enabled)) ? cJSON_IsTrue(enabled) : true }; // 更新轮询配置 if (modbus_update_poll_config(&poll_config)) { ESP_LOGI(TAG, "MODBUS poll config updated via MQTT"); } else { ESP_LOGE(TAG, "Failed to update MODBUS poll config"); } } else { ESP_LOGE(TAG, "Missing required fields in MODBUS poll command"); } } } cJSON_Delete(root); } else { ESP_LOGE(TAG, "Failed to parse JSON: %s", json_str); } free(json_str); } } break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); // 错误类型处理分支 if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { ESP_LOGI(TAG, "Last error code reported from esp-tls: 0x%x", event->error_handle->esp_tls_last_esp_err); ESP_LOGI(TAG, "Last tls stack error number: 0x%x", event->error_handle->esp_tls_stack_err); ESP_LOGI(TAG, "Last captured errno : %d (%s)", event->error_handle->esp_transport_sock_errno, strerror(event->error_handle->esp_transport_sock_errno)); } else if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) { ESP_LOGI(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code); } else { ESP_LOGW(TAG, "Unknown error type: 0x%x", event->error_handle->error_type); } status_led_blink_mode(2, 0); // LED2 慢闪:MQTT错误 break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } } /** * @brief 启动MQTT客户端应用程序 * * 初始化MQTT客户端配置,注册事件处理程序,并启动MQTT连接 * * @param 无参数 * @return 无返回值 */ void mqtt_app_start(void) { // 配置MQTT客户端结构体,设置代理服务器地址和证书验证 const esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = CONFIG_BROKER_URI, // .broker.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start, .credentials.client_id = CONFIG_MQTT_CLIENT_ID, .credentials.username = CONFIG_MQTT_USERNAME, .credentials.authentication.password = CONFIG_MQTT_PASSWORD, }; ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size()); g_client = esp_mqtt_client_init(&mqtt_cfg); /* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */ esp_mqtt_client_register_event(g_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); esp_mqtt_client_start(g_client); } /** * @brief 发布MQTT消息 * * 提供一个外部接口来发布MQTT消息 * * @param topic 发布的主题 * @param data 要发布的数据 * @param len 数据长度 * @param qos QoS级别 (0, 1, 或 2) * @param retain 是否保留消息 * @return 消息ID,如果失败则返回负数 */ int mqtt_publish_message(const char* topic, const char* data, int len, int qos, int retain) { if (g_client == NULL) { ESP_LOGE(TAG, "MQTT client not initialized"); return -1; } int msg_id = esp_mqtt_client_publish(g_client, topic, data, len, qos, retain); if (msg_id < 0) { ESP_LOGE(TAG, "Failed to publish message to topic: %s", topic); return -1; } ESP_LOGI(TAG, "Published message to topic: %s, msg_id: %d", topic, msg_id); return msg_id; }