#include #include #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/semphr.h" #include "esp_system.h" #include "esp_log.h" #include "nvs_flash.h" #include "nvs.h" #include "plant_mqtt.h" #include "mqtt_client.h" // ESP-IDF MQTT client header static const char *TAG = "MQTT_CLIENT"; // NVS namespace for MQTT settings #define MQTT_NVS_NAMESPACE "mqtt_config" // MQTT client handle static esp_mqtt_client_handle_t s_mqtt_client = NULL; // Current state static mqtt_state_t s_mqtt_state = MQTT_STATE_DISCONNECTED; // Callbacks static mqtt_connected_callback_t s_connected_callback = NULL; static mqtt_disconnected_callback_t s_disconnected_callback = NULL; static mqtt_data_callback_t s_data_callback = NULL; // Mutex for thread safety static SemaphoreHandle_t s_mqtt_mutex = NULL; // Forward declarations static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data); static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event); esp_err_t mqtt_client_init(void) { if (s_mqtt_client != NULL) { ESP_LOGW(TAG, "MQTT client already initialized"); return ESP_OK; } // Create mutex s_mqtt_mutex = xSemaphoreCreateMutex(); if (s_mqtt_mutex == NULL) { ESP_LOGE(TAG, "Failed to create mutex"); return ESP_FAIL; } // Try to load credentials from NVS char url[128] = {0}; char username[64] = {0}; char password[64] = {0}; if (mqtt_client_get_config(url, sizeof(url), username, sizeof(username), password, sizeof(password)) != ESP_OK) { // Use defaults from menuconfig ESP_LOGI(TAG, "No stored MQTT config, using defaults from menuconfig"); strlcpy(url, CONFIG_MQTT_BROKER_URL, sizeof(url)); strlcpy(username, CONFIG_MQTT_USERNAME, sizeof(username)); strlcpy(password, CONFIG_MQTT_PASSWORD, sizeof(password)); // Save defaults to NVS mqtt_client_set_broker_url(url); mqtt_client_set_credentials(username, password); } else { ESP_LOGI(TAG, "Loaded MQTT config from NVS"); } // Configure MQTT client - ESP-IDF v5+ format esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = url, .credentials.username = username, .credentials.authentication.password = password, .credentials.client_id = MQTT_CLIENT_ID, .session.keepalive = MQTT_KEEPALIVE, .session.last_will.topic = TOPIC_LAST_WILL, .session.last_will.msg = STATUS_OFFLINE, .session.last_will.qos = MQTT_QOS_1, .session.last_will.retain = MQTT_RETAIN, .network.reconnect_timeout_ms = 10000, }; // Create MQTT client s_mqtt_client = esp_mqtt_client_init(&mqtt_cfg); if (s_mqtt_client == NULL) { ESP_LOGE(TAG, "Failed to create MQTT client"); vSemaphoreDelete(s_mqtt_mutex); s_mqtt_mutex = NULL; return ESP_FAIL; } // Register event handler esp_err_t ret = esp_mqtt_client_register_event(s_mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, s_mqtt_client); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to register event handler"); esp_mqtt_client_destroy(s_mqtt_client); s_mqtt_client = NULL; vSemaphoreDelete(s_mqtt_mutex); s_mqtt_mutex = NULL; return ret; } ESP_LOGI(TAG, "MQTT client initialized"); return ESP_OK; } esp_err_t mqtt_client_start(void) { if (s_mqtt_client == NULL) { ESP_LOGE(TAG, "MQTT client not initialized"); return ESP_ERR_INVALID_STATE; } ESP_LOGI(TAG, "Starting MQTT client..."); ESP_LOGI(TAG, "Broker URL: %s", CONFIG_MQTT_BROKER_URL); ESP_LOGI(TAG, "Client ID: %s", MQTT_CLIENT_ID); esp_err_t ret = esp_mqtt_client_start(s_mqtt_client); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to start MQTT client"); s_mqtt_state = MQTT_STATE_ERROR; return ret; } s_mqtt_state = MQTT_STATE_CONNECTING; ESP_LOGI(TAG, "MQTT client started"); return ESP_OK; } esp_err_t mqtt_client_stop(void) { if (s_mqtt_client == NULL) { return ESP_OK; } ESP_LOGI(TAG, "Stopping MQTT client..."); // Publish offline status before stopping if (s_mqtt_state == MQTT_STATE_CONNECTED) { mqtt_publish_status(STATUS_OFFLINE); vTaskDelay(100 / portTICK_PERIOD_MS); // Give time to send } esp_err_t ret = esp_mqtt_client_stop(s_mqtt_client); if (ret == ESP_OK) { s_mqtt_state = MQTT_STATE_DISCONNECTED; ESP_LOGI(TAG, "MQTT client stopped"); } return ret; } esp_err_t mqtt_client_publish(const char* topic, const char* data, int qos, int retain) { if (s_mqtt_client == NULL || s_mqtt_state != MQTT_STATE_CONNECTED) { ESP_LOGW(TAG, "MQTT client not connected"); return ESP_ERR_INVALID_STATE; } if (topic == NULL || data == NULL) { return ESP_ERR_INVALID_ARG; } xSemaphoreTake(s_mqtt_mutex, portMAX_DELAY); int msg_id = esp_mqtt_client_publish(s_mqtt_client, topic, data, strlen(data), qos, retain); xSemaphoreGive(s_mqtt_mutex); if (msg_id < 0) { ESP_LOGE(TAG, "Failed to publish to topic: %s", topic); return ESP_FAIL; } ESP_LOGD(TAG, "Published to %s: %s (msg_id: %d)", topic, data, msg_id); return ESP_OK; } esp_err_t mqtt_client_subscribe(const char* topic, int qos) { if (s_mqtt_client == NULL || s_mqtt_state != MQTT_STATE_CONNECTED) { ESP_LOGW(TAG, "MQTT client not connected"); return ESP_ERR_INVALID_STATE; } if (topic == NULL) { return ESP_ERR_INVALID_ARG; } int msg_id = esp_mqtt_client_subscribe(s_mqtt_client, topic, qos); if (msg_id < 0) { ESP_LOGE(TAG, "Failed to subscribe to topic: %s", topic); return ESP_FAIL; } ESP_LOGI(TAG, "Subscribed to topic: %s (msg_id: %d)", topic, msg_id); return ESP_OK; } esp_err_t mqtt_client_unsubscribe(const char* topic) { if (s_mqtt_client == NULL || s_mqtt_state != MQTT_STATE_CONNECTED) { ESP_LOGW(TAG, "MQTT client not connected"); return ESP_ERR_INVALID_STATE; } if (topic == NULL) { return ESP_ERR_INVALID_ARG; } int msg_id = esp_mqtt_client_unsubscribe(s_mqtt_client, topic); if (msg_id < 0) { ESP_LOGE(TAG, "Failed to unsubscribe from topic: %s", topic); return ESP_FAIL; } ESP_LOGI(TAG, "Unsubscribed from topic: %s (msg_id: %d)", topic, msg_id); return ESP_OK; } bool mqtt_client_is_connected(void) { return s_mqtt_state == MQTT_STATE_CONNECTED; } mqtt_state_t mqtt_client_get_state(void) { return s_mqtt_state; } void mqtt_client_register_callbacks(mqtt_connected_callback_t on_connected, mqtt_disconnected_callback_t on_disconnected, mqtt_data_callback_t on_data) { s_connected_callback = on_connected; s_disconnected_callback = on_disconnected; s_data_callback = on_data; } // Utility functions esp_err_t mqtt_publish_status(const char* status) { return mqtt_client_publish(TOPIC_STATUS, status, MQTT_QOS_1, MQTT_RETAIN); } esp_err_t mqtt_publish_moisture(int sensor_id, int value) { char topic[64]; char data[32]; snprintf(topic, sizeof(topic), "plant_watering/moisture/%d", sensor_id); snprintf(data, sizeof(data), "%d", value); return mqtt_client_publish(topic, data, MQTT_QOS_0, MQTT_NO_RETAIN); } esp_err_t mqtt_publish_pump_state(int pump_id, bool state) { char topic[64]; const char* state_str = state ? "on" : "off"; snprintf(topic, sizeof(topic), "plant_watering/pump/%d/state", pump_id); return mqtt_client_publish(topic, state_str, MQTT_QOS_1, MQTT_RETAIN); } // Event handler static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%ld", base, event_id); mqtt_event_handler_cb(event_data); } static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) { switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT connected"); s_mqtt_state = MQTT_STATE_CONNECTED; // Publish online status mqtt_publish_status(STATUS_ONLINE); // Subscribe to command topics mqtt_client_subscribe(TOPIC_PUMP_1_CMD, MQTT_QOS_1); mqtt_client_subscribe(TOPIC_PUMP_2_CMD, MQTT_QOS_1); mqtt_client_subscribe(TOPIC_CONFIG, MQTT_QOS_1); // Call user callback if (s_connected_callback) { s_connected_callback(); } break; case MQTT_EVENT_DISCONNECTED: ESP_LOGW(TAG, "MQTT disconnected"); s_mqtt_state = MQTT_STATE_DISCONNECTED; // Call user callback if (s_disconnected_callback) { s_disconnected_callback(); } break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "Subscribed to topic, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "Unsubscribed from topic, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGD(TAG, "Message published, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT data received"); ESP_LOGI(TAG, "Topic: %.*s", event->topic_len, event->topic); ESP_LOGI(TAG, "Data: %.*s", event->data_len, event->data); // Call user callback if (s_data_callback) { // Null-terminate the strings for easier handling char topic[256] = {0}; char data[512] = {0}; int topic_len = event->topic_len < sizeof(topic) - 1 ? event->topic_len : sizeof(topic) - 1; int data_len = event->data_len < sizeof(data) - 1 ? event->data_len : sizeof(data) - 1; memcpy(topic, event->topic, topic_len); memcpy(data, event->data, data_len); s_data_callback(topic, data, data_len); } break; case MQTT_EVENT_ERROR: ESP_LOGE(TAG, "MQTT error"); if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { ESP_LOGE(TAG, "Last error code reported from esp-tls: 0x%x", event->error_handle->esp_tls_last_esp_err); ESP_LOGE(TAG, "Last tls stack error number: 0x%x", event->error_handle->esp_tls_stack_err); ESP_LOGE(TAG, "Last captured errno : %d (%s)", event->error_handle->esp_transport_sock_errno, strerror(event->error_handle->esp_transport_sock_errno)); } else if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) { ESP_LOGE(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code); } else { ESP_LOGW(TAG, "Unknown error type: 0x%x", event->error_handle->error_type); } s_mqtt_state = MQTT_STATE_ERROR; break; case MQTT_EVENT_BEFORE_CONNECT: ESP_LOGI(TAG, "MQTT client connecting..."); s_mqtt_state = MQTT_STATE_CONNECTING; break; default: ESP_LOGD(TAG, "Other MQTT event id: %d", event->event_id); break; } return ESP_OK; } // Configuration management functions esp_err_t mqtt_client_set_broker_url(const char* url) { nvs_handle_t nvs_handle; esp_err_t ret; ret = nvs_open(MQTT_NVS_NAMESPACE, NVS_READWRITE, &nvs_handle); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to open NVS namespace"); return ret; } ret = nvs_set_str(nvs_handle, "broker_url", url); if (ret != ESP_OK) { nvs_close(nvs_handle); return ret; } ret = nvs_commit(nvs_handle); nvs_close(nvs_handle); if (ret == ESP_OK) { ESP_LOGI(TAG, "MQTT broker URL saved to NVS"); } return ret; } esp_err_t mqtt_client_set_credentials(const char* username, const char* password) { nvs_handle_t nvs_handle; esp_err_t ret; ret = nvs_open(MQTT_NVS_NAMESPACE, NVS_READWRITE, &nvs_handle); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to open NVS namespace"); return ret; } ret = nvs_set_str(nvs_handle, "username", username); if (ret != ESP_OK) { nvs_close(nvs_handle); return ret; } ret = nvs_set_str(nvs_handle, "password", password); if (ret != ESP_OK) { nvs_close(nvs_handle); return ret; } ret = nvs_commit(nvs_handle); nvs_close(nvs_handle); if (ret == ESP_OK) { ESP_LOGI(TAG, "MQTT credentials saved to NVS"); } return ret; } esp_err_t mqtt_client_get_config(char* url, size_t url_len, char* username, size_t username_len, char* password, size_t password_len) { nvs_handle_t nvs_handle; esp_err_t ret; ret = nvs_open(MQTT_NVS_NAMESPACE, NVS_READONLY, &nvs_handle); if (ret != ESP_OK) { return ret; } ret = nvs_get_str(nvs_handle, "broker_url", url, &url_len); if (ret != ESP_OK) { nvs_close(nvs_handle); return ret; } ret = nvs_get_str(nvs_handle, "username", username, &username_len); if (ret != ESP_OK) { nvs_close(nvs_handle); return ret; } ret = nvs_get_str(nvs_handle, "password", password, &password_len); nvs_close(nvs_handle); return ret; }