Files
DistributedCollectorGateway/components/MQTT_ESP/MQTT_ESP.c

488 lines
18 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include <stdio.h>
#include "MQTT_ESP.h"
#include "STATUS_LED.h"
#include "MODBUS_ESP.h"
#include "SNTP_ESP.h"
#include "cJSON.h"
#include "esp_wifi.h"
#include "esp_mac.h"
#include "esp_system.h"
#include "esp_netif.h"
#include <time.h>
static const char *TAG = "mqtt_esp";
// 全局MQTT客户端句柄
static esp_mqtt_client_handle_t g_client = NULL;
// ============================
// 设备状态上报任务相关
// ============================
static TaskHandle_t device_status_task_handle = NULL;
static uint32_t g_report_interval_ms = 10000; // 默认10秒上报一次
static SemaphoreHandle_t report_interval_mutex = NULL;
/**
* @brief 获取设备MAC地址字符串
*/
static void get_device_mac(char *mac_str, size_t max_len)
{
uint8_t mac[6];
esp_read_mac(mac, ESP_MAC_WIFI_STA);
snprintf(mac_str, max_len, "%02X:%02X:%02X:%02X:%02X:%02X",
mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
}
/**
* @brief 获取设备运行时间(秒)
*/
static uint32_t get_device_uptime(void)
{
return (uint32_t)(xTaskGetTickCount() * portTICK_PERIOD_MS / 1000);
}
/**
* @brief 获取设备IP地址字符串
*/
static void get_device_ip(char *ip_str, size_t max_len)
{
esp_netif_t *netif = esp_netif_get_handle_from_ifkey("ETH_DEF");
if (netif != NULL) {
esp_netif_ip_info_t ip_info;
if (esp_netif_get_ip_info(netif, &ip_info) == ESP_OK) {
snprintf(ip_str, max_len, IPSTR, IP2STR(&ip_info.ip));
} else {
strncpy(ip_str, "N/A", max_len);
}
} else {
strncpy(ip_str, "N/A", max_len);
}
}
/**
* @brief 获取当前时间字符串
*/
static void get_current_time(char *time_str, size_t max_len)
{
// 使用SNTP组件获取格式化时间
if (sntp_esp_get_formatted_time(time_str, max_len, "%Y-%m-%d %H:%M:%S") != ESP_OK) {
// 如果获取失败,使用本地时间
time_t now;
time(&now);
struct tm timeinfo;
localtime_r(&now, &timeinfo);
strftime(time_str, max_len, "%Y-%m-%d %H:%M:%S", &timeinfo);
}
}
/**
* @brief 获取LED状态描述中文
*/
static const char* get_led_state_desc(led_state_t state)
{
switch (state) {
case LED_OFF: return "关闭";
case LED_ON: return "常亮";
case LED_BLINK_SLOW: return "慢闪";
case LED_BLINK_FAST: return "快闪";
case LED_HEARTBEAT: return "心跳";
default: return "未知";
}
}
/**
* @brief 获取运行时间描述(中文)
*/
static void get_uptime_desc(uint32_t uptime_sec, char *desc, size_t max_len)
{
uint32_t days = uptime_sec / 86400;
uint32_t hours = (uptime_sec % 86400) / 3600;
uint32_t minutes = (uptime_sec % 3600) / 60;
uint32_t seconds = uptime_sec % 60;
if (days > 0) {
snprintf(desc, max_len, "%lu天%lu小时%lu分%lu秒", days, hours, minutes, seconds);
} else if (hours > 0) {
snprintf(desc, max_len, "%lu小时%lu分%lu秒", hours, minutes, seconds);
} else if (minutes > 0) {
snprintf(desc, max_len, "%lu分%lu秒", minutes, seconds);
} else {
snprintf(desc, max_len, "%lu秒", seconds);
}
}
/**
* @brief 构建设备状态JSON
*/
static char* build_device_status_json(void)
{
char mac_str[18];
char ip_str[16];
char time_str[32];
char uptime_desc[64];
get_device_mac(mac_str, sizeof(mac_str));
get_device_ip(ip_str, sizeof(ip_str));
get_current_time(time_str, sizeof(time_str));
get_uptime_desc(get_device_uptime(), uptime_desc, sizeof(uptime_desc));
cJSON *root = cJSON_CreateObject();
if (root == NULL) {
return NULL;
}
// 基本信息
cJSON_AddStringToObject(root, "message_type", "device_status");
cJSON_AddStringToObject(root, "mac_address", mac_str);
cJSON_AddStringToObject(root, "ip_address", ip_str);
cJSON_AddStringToObject(root, "chip_model", "ESP32-S3");
// IDF 版本
cJSON_AddStringToObject(root, "idf_version", esp_get_idf_version());
// 运行状态
cJSON_AddNumberToObject(root, "uptime", get_device_uptime());
cJSON_AddStringToObject(root, "uptime_desc", uptime_desc); // 中文描述
cJSON_AddNumberToObject(root, "free_heap", esp_get_free_heap_size());
cJSON_AddStringToObject(root, "status", "online");
cJSON_AddStringToObject(root, "status_desc", "在线"); // 中文描述
cJSON_AddStringToObject(root, "update_time", time_str); // 更新时间
// LED 状态(带中文描述)
led_state_t led1_state = status_led_get_state(1);
led_state_t led2_state = status_led_get_state(2);
cJSON_AddNumberToObject(root, "led1_state", (int)led1_state);
cJSON_AddStringToObject(root, "led1_desc", get_led_state_desc(led1_state)); // LED1: 网络状态
cJSON_AddNumberToObject(root, "led2_state", (int)led2_state);
cJSON_AddStringToObject(root, "led2_desc", get_led_state_desc(led2_state)); // LED2: 通信状态
// LED 功能说明(中文)
cJSON_AddStringToObject(root, "led1_function", "网络状态灯");
cJSON_AddStringToObject(root, "led2_function", "通信状态灯");
// MODBUS 轮询状态
modbus_poll_config_t *modbus_config = modbus_get_current_config();
if (modbus_config != NULL) {
cJSON_AddNumberToObject(root, "modbus_enabled", modbus_config->enabled);
cJSON_AddStringToObject(root, "modbus_enabled_desc", modbus_config->enabled ? "启用" : "禁用");
cJSON_AddNumberToObject(root, "modbus_channel", modbus_config->channel_num);
cJSON_AddStringToObject(root, "modbus_channel_desc", modbus_config->channel_num == 0 ? "通道0 (UART0)" : "通道1 (UART2)");
cJSON_AddNumberToObject(root, "modbus_slave_addr", modbus_config->slave_addr);
cJSON_AddNumberToObject(root, "modbus_interval", modbus_config->poll_interval_ms);
} else {
cJSON_AddNumberToObject(root, "modbus_enabled", 0);
cJSON_AddStringToObject(root, "modbus_enabled_desc", "未配置");
cJSON_AddNumberToObject(root, "modbus_channel", 0);
cJSON_AddStringToObject(root, "modbus_channel_desc", "N/A");
cJSON_AddNumberToObject(root, "modbus_slave_addr", 0);
cJSON_AddNumberToObject(root, "modbus_interval", 0);
}
// 内存使用情况
uint32_t total_heap = esp_get_free_heap_size();
cJSON_AddStringToObject(root, "heap_status", total_heap > 100000 ? "充足" : (total_heap > 50000 ? "一般" : "紧张"));
char *json_str = cJSON_Print(root);
cJSON_Delete(root);
return json_str;
}
/**
* @brief 设备状态上报任务
*/
static void device_status_report_task(void *arg)
{
ESP_LOGI(TAG, "Device status report task started");
while (1) {
// 检查是否应该退出
if (device_status_task_handle == NULL) {
break;
}
// 检查MQTT是否已连接
if (g_client != NULL) {
// 构建设备状态JSON
char *status_json = build_device_status_json();
if (status_json != NULL) {
// 发布到MQTT
int ret = esp_mqtt_client_publish(g_client, CONFIG_MQTT_PUB_TOPIC,
status_json, strlen(status_json), 0, 0);
if (ret >= 0) {
ESP_LOGI(TAG, "Device status published, msg_id=%d", ret);
ESP_LOGD(TAG, "Status: %s", status_json);
} else {
ESP_LOGW(TAG, "Failed to publish device status");
}
free(status_json);
}
}
// 获取上报间隔
uint32_t interval;
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
interval = g_report_interval_ms;
xSemaphoreGive(report_interval_mutex);
// 等待下一次上报
vTaskDelay(pdMS_TO_TICKS(interval));
}
ESP_LOGI(TAG, "Device status report task exiting");
device_status_task_handle = NULL;
vTaskDelete(NULL);
}
BaseType_t mqtt_start_device_status_task(uint32_t report_interval_ms)
{
// 如果任务已存在,先停止
if (device_status_task_handle != NULL) {
mqtt_stop_device_status_task();
vTaskDelay(pdMS_TO_TICKS(100));
}
// 创建互斥量
if (report_interval_mutex == NULL) {
report_interval_mutex = xSemaphoreCreateMutex();
if (report_interval_mutex == NULL) {
ESP_LOGE(TAG, "Failed to create report interval mutex");
return pdFALSE;
}
}
// 更新上报间隔
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
xSemaphoreGive(report_interval_mutex);
// 创建任务
BaseType_t ret = xTaskCreate(device_status_report_task, "dev_status",
4096, NULL, 5, &device_status_task_handle);
if (ret == pdPASS) {
ESP_LOGI(TAG, "Device status report task started (interval=%dms)", g_report_interval_ms);
} else {
ESP_LOGE(TAG, "Failed to create device status report task");
}
return ret;
}
void mqtt_stop_device_status_task(void)
{
if (device_status_task_handle != NULL) {
ESP_LOGI(TAG, "Stopping device status report task...");
TaskHandle_t temp_handle = device_status_task_handle;
device_status_task_handle = NULL;
vTaskDelete(temp_handle);
vTaskDelay(pdMS_TO_TICKS(200));
ESP_LOGI(TAG, "Device status report task stopped");
}
}
void mqtt_update_report_interval(uint32_t report_interval_ms)
{
if (report_interval_mutex != NULL) {
xSemaphoreTake(report_interval_mutex, portMAX_DELAY);
g_report_interval_ms = report_interval_ms > 0 ? report_interval_ms : 10000;
xSemaphoreGive(report_interval_mutex);
ESP_LOGI(TAG, "Device status report interval updated to %dms", g_report_interval_ms);
}
}
/**
* @brief MQTT事件处理函数
*
* 处理MQTT客户端的各种事件包括连接、订阅、发布、数据接收和错误处理。
*
* @param handler_args 事件处理器参数
* @param base 事件基础类型
* @param event_id 事件ID
* @param event_data 指向MQTT事件数据的指针
*/
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32, base, event_id);
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
int msg_id;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
// 订阅并取消订阅测试主题
msg_id = esp_mqtt_client_subscribe(client, CONFIG_MQTT_SUB_TOPIC, 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
status_led_blink_mode(2, 2); // LED2 心跳MQTT连接正常
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
status_led_blink_mode(2, 1); // LED2 快闪MQTT断开正在重连
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d, return code=0x%02x ", event->msg_id, (uint8_t)*event->data);
// 立即上报一次设备状态
char *status_json = build_device_status_json();
if (status_json != NULL) {
msg_id = esp_mqtt_client_publish(client, CONFIG_MQTT_PUB_TOPIC,
status_json, strlen(status_json), 0, 0);
if (msg_id >= 0) {
ESP_LOGI(TAG, "Device status published immediately, msg_id=%d", msg_id);
}
free(status_json);
}
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, "DATA=%.*s", event->data_len, event->data);
// 解析MQTT控制指令JSON格式
if (event->data_len > 0 && event->data != NULL) {
// 创建临时缓冲区存储JSON数据
char *json_str = malloc(event->data_len + 1);
if (json_str != NULL) {
memcpy(json_str, event->data, event->data_len);
json_str[event->data_len] = '\0';
// 解析JSON
cJSON *root = cJSON_Parse(json_str);
if (root != NULL) {
// 检查是否是MODBUS轮询控制指令
cJSON *cmd = cJSON_GetObjectItem(root, "command");
if (cmd != NULL && cJSON_IsString(cmd)) {
if (strcmp(cmd->valuestring, "modbus_poll") == 0) {
// 解析轮询配置
cJSON *channel = cJSON_GetObjectItem(root, "channel");
cJSON *slave_addr = cJSON_GetObjectItem(root, "slave_addr");
cJSON *start_addr = cJSON_GetObjectItem(root, "start_addr");
cJSON *reg_count = cJSON_GetObjectItem(root, "reg_count");
cJSON *interval = cJSON_GetObjectItem(root, "interval");
cJSON *enabled = cJSON_GetObjectItem(root, "enabled");
// 验证必要字段
if (channel != NULL && cJSON_IsNumber(channel) &&
slave_addr != NULL && cJSON_IsNumber(slave_addr) &&
start_addr != NULL && cJSON_IsNumber(start_addr) &&
reg_count != NULL && cJSON_IsNumber(reg_count) &&
interval != NULL && cJSON_IsNumber(interval)) {
// 构建轮询配置
modbus_poll_config_t poll_config = {
.channel_num = channel->valueint,
.slave_addr = (uint8_t)slave_addr->valueint,
.start_addr = (uint16_t)start_addr->valueint,
.reg_count = (uint16_t)reg_count->valueint,
.poll_interval_ms = (uint32_t)interval->valueint,
.enabled = (enabled != NULL && cJSON_IsBool(enabled)) ? cJSON_IsTrue(enabled) : true
};
// 更新轮询配置
if (modbus_update_poll_config(&poll_config)) {
ESP_LOGI(TAG, "MODBUS poll config updated via MQTT");
} else {
ESP_LOGE(TAG, "Failed to update MODBUS poll config");
}
} else {
ESP_LOGE(TAG, "Missing required fields in MODBUS poll command");
}
}
}
cJSON_Delete(root);
} else {
ESP_LOGE(TAG, "Failed to parse JSON: %s", json_str);
}
free(json_str);
}
}
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
// 错误类型处理分支
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
ESP_LOGI(TAG, "Last error code reported from esp-tls: 0x%x", event->error_handle->esp_tls_last_esp_err);
ESP_LOGI(TAG, "Last tls stack error number: 0x%x", event->error_handle->esp_tls_stack_err);
ESP_LOGI(TAG, "Last captured errno : %d (%s)", event->error_handle->esp_transport_sock_errno,
strerror(event->error_handle->esp_transport_sock_errno));
} else if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
ESP_LOGI(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code);
} else {
ESP_LOGW(TAG, "Unknown error type: 0x%x", event->error_handle->error_type);
}
status_led_blink_mode(2, 0); // LED2 慢闪MQTT错误
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
/**
* @brief 启动MQTT客户端应用程序
*
* 初始化MQTT客户端配置注册事件处理程序并启动MQTT连接
*
* @param 无参数
* @return 无返回值
*/
void mqtt_app_start(void)
{
// 配置MQTT客户端结构体设置代理服务器地址和证书验证
const esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_BROKER_URI,
// .broker.verification.certificate = (const char *)mqtt_eclipseprojects_io_pem_start,
.credentials.client_id = CONFIG_MQTT_CLIENT_ID,
.credentials.username = CONFIG_MQTT_USERNAME,
.credentials.authentication.password = CONFIG_MQTT_PASSWORD,
};
ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size());
g_client = esp_mqtt_client_init(&mqtt_cfg);
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */
esp_mqtt_client_register_event(g_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
esp_mqtt_client_start(g_client);
}
/**
* @brief 发布MQTT消息
*
* 提供一个外部接口来发布MQTT消息
*
* @param topic 发布的主题
* @param data 要发布的数据
* @param len 数据长度
* @param qos QoS级别 (0, 1, 或 2)
* @param retain 是否保留消息
* @return 消息ID如果失败则返回负数
*/
int mqtt_publish_message(const char* topic, const char* data, int len, int qos, int retain)
{
if (g_client == NULL) {
ESP_LOGE(TAG, "MQTT client not initialized");
return -1;
}
int msg_id = esp_mqtt_client_publish(g_client, topic, data, len, qos, retain);
if (msg_id < 0) {
ESP_LOGE(TAG, "Failed to publish message to topic: %s", topic);
return -1;
}
ESP_LOGI(TAG, "Published message to topic: %s, msg_id: %d", topic, msg_id);
return msg_id;
}