Added MQTT
This commit is contained in:
457
main/plant_mqtt.c
Normal file
457
main/plant_mqtt.c
Normal file
@ -0,0 +1,457 @@
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include "freertos/FreeRTOS.h"
|
||||
#include "freertos/task.h"
|
||||
#include "freertos/semphr.h"
|
||||
#include "esp_system.h"
|
||||
#include "esp_log.h"
|
||||
#include "nvs_flash.h"
|
||||
#include "nvs.h"
|
||||
#include "plant_mqtt.h"
|
||||
#include "mqtt_client.h" // ESP-IDF MQTT client header
|
||||
|
||||
static const char *TAG = "MQTT_CLIENT";
|
||||
|
||||
// NVS namespace for MQTT settings
|
||||
#define MQTT_NVS_NAMESPACE "mqtt_config"
|
||||
|
||||
// MQTT client handle
|
||||
static esp_mqtt_client_handle_t s_mqtt_client = NULL;
|
||||
|
||||
// Current state
|
||||
static mqtt_state_t s_mqtt_state = MQTT_STATE_DISCONNECTED;
|
||||
|
||||
// Callbacks
|
||||
static mqtt_connected_callback_t s_connected_callback = NULL;
|
||||
static mqtt_disconnected_callback_t s_disconnected_callback = NULL;
|
||||
static mqtt_data_callback_t s_data_callback = NULL;
|
||||
|
||||
// Mutex for thread safety
|
||||
static SemaphoreHandle_t s_mqtt_mutex = NULL;
|
||||
|
||||
// Forward declarations
|
||||
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
|
||||
static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event);
|
||||
|
||||
esp_err_t mqtt_client_init(void)
|
||||
{
|
||||
if (s_mqtt_client != NULL) {
|
||||
ESP_LOGW(TAG, "MQTT client already initialized");
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
// Create mutex
|
||||
s_mqtt_mutex = xSemaphoreCreateMutex();
|
||||
if (s_mqtt_mutex == NULL) {
|
||||
ESP_LOGE(TAG, "Failed to create mutex");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
// Try to load credentials from NVS
|
||||
char url[128] = {0};
|
||||
char username[64] = {0};
|
||||
char password[64] = {0};
|
||||
|
||||
if (mqtt_client_get_config(url, sizeof(url), username, sizeof(username),
|
||||
password, sizeof(password)) != ESP_OK) {
|
||||
// Use defaults from menuconfig
|
||||
ESP_LOGI(TAG, "No stored MQTT config, using defaults from menuconfig");
|
||||
strlcpy(url, CONFIG_MQTT_BROKER_URL, sizeof(url));
|
||||
strlcpy(username, CONFIG_MQTT_USERNAME, sizeof(username));
|
||||
strlcpy(password, CONFIG_MQTT_PASSWORD, sizeof(password));
|
||||
|
||||
// Save defaults to NVS
|
||||
mqtt_client_set_broker_url(url);
|
||||
mqtt_client_set_credentials(username, password);
|
||||
} else {
|
||||
ESP_LOGI(TAG, "Loaded MQTT config from NVS");
|
||||
}
|
||||
|
||||
// Configure MQTT client - ESP-IDF v5+ format
|
||||
esp_mqtt_client_config_t mqtt_cfg = {
|
||||
.broker.address.uri = url,
|
||||
.credentials.username = username,
|
||||
.credentials.authentication.password = password,
|
||||
.credentials.client_id = MQTT_CLIENT_ID,
|
||||
.session.keepalive = MQTT_KEEPALIVE,
|
||||
.session.last_will.topic = TOPIC_LAST_WILL,
|
||||
.session.last_will.msg = STATUS_OFFLINE,
|
||||
.session.last_will.qos = MQTT_QOS_1,
|
||||
.session.last_will.retain = MQTT_RETAIN,
|
||||
.network.reconnect_timeout_ms = 10000,
|
||||
};
|
||||
|
||||
// Create MQTT client
|
||||
s_mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
|
||||
if (s_mqtt_client == NULL) {
|
||||
ESP_LOGE(TAG, "Failed to create MQTT client");
|
||||
vSemaphoreDelete(s_mqtt_mutex);
|
||||
s_mqtt_mutex = NULL;
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
// Register event handler
|
||||
esp_err_t ret = esp_mqtt_client_register_event(s_mqtt_client, ESP_EVENT_ANY_ID,
|
||||
mqtt_event_handler, s_mqtt_client);
|
||||
if (ret != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to register event handler");
|
||||
esp_mqtt_client_destroy(s_mqtt_client);
|
||||
s_mqtt_client = NULL;
|
||||
vSemaphoreDelete(s_mqtt_mutex);
|
||||
s_mqtt_mutex = NULL;
|
||||
return ret;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "MQTT client initialized");
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
esp_err_t mqtt_client_start(void)
|
||||
{
|
||||
if (s_mqtt_client == NULL) {
|
||||
ESP_LOGE(TAG, "MQTT client not initialized");
|
||||
return ESP_ERR_INVALID_STATE;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "Starting MQTT client...");
|
||||
ESP_LOGI(TAG, "Broker URL: %s", CONFIG_MQTT_BROKER_URL);
|
||||
ESP_LOGI(TAG, "Client ID: %s", MQTT_CLIENT_ID);
|
||||
|
||||
esp_err_t ret = esp_mqtt_client_start(s_mqtt_client);
|
||||
if (ret != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to start MQTT client");
|
||||
s_mqtt_state = MQTT_STATE_ERROR;
|
||||
return ret;
|
||||
}
|
||||
|
||||
s_mqtt_state = MQTT_STATE_CONNECTING;
|
||||
ESP_LOGI(TAG, "MQTT client started");
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
esp_err_t mqtt_client_stop(void)
|
||||
{
|
||||
if (s_mqtt_client == NULL) {
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "Stopping MQTT client...");
|
||||
|
||||
// Publish offline status before stopping
|
||||
if (s_mqtt_state == MQTT_STATE_CONNECTED) {
|
||||
mqtt_publish_status(STATUS_OFFLINE);
|
||||
vTaskDelay(100 / portTICK_PERIOD_MS); // Give time to send
|
||||
}
|
||||
|
||||
esp_err_t ret = esp_mqtt_client_stop(s_mqtt_client);
|
||||
if (ret == ESP_OK) {
|
||||
s_mqtt_state = MQTT_STATE_DISCONNECTED;
|
||||
ESP_LOGI(TAG, "MQTT client stopped");
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
esp_err_t mqtt_client_publish(const char* topic, const char* data, int qos, int retain)
|
||||
{
|
||||
if (s_mqtt_client == NULL || s_mqtt_state != MQTT_STATE_CONNECTED) {
|
||||
ESP_LOGW(TAG, "MQTT client not connected");
|
||||
return ESP_ERR_INVALID_STATE;
|
||||
}
|
||||
|
||||
if (topic == NULL || data == NULL) {
|
||||
return ESP_ERR_INVALID_ARG;
|
||||
}
|
||||
|
||||
xSemaphoreTake(s_mqtt_mutex, portMAX_DELAY);
|
||||
|
||||
int msg_id = esp_mqtt_client_publish(s_mqtt_client, topic, data, strlen(data), qos, retain);
|
||||
|
||||
xSemaphoreGive(s_mqtt_mutex);
|
||||
|
||||
if (msg_id < 0) {
|
||||
ESP_LOGE(TAG, "Failed to publish to topic: %s", topic);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
ESP_LOGD(TAG, "Published to %s: %s (msg_id: %d)", topic, data, msg_id);
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
esp_err_t mqtt_client_subscribe(const char* topic, int qos)
|
||||
{
|
||||
if (s_mqtt_client == NULL || s_mqtt_state != MQTT_STATE_CONNECTED) {
|
||||
ESP_LOGW(TAG, "MQTT client not connected");
|
||||
return ESP_ERR_INVALID_STATE;
|
||||
}
|
||||
|
||||
if (topic == NULL) {
|
||||
return ESP_ERR_INVALID_ARG;
|
||||
}
|
||||
|
||||
int msg_id = esp_mqtt_client_subscribe(s_mqtt_client, topic, qos);
|
||||
if (msg_id < 0) {
|
||||
ESP_LOGE(TAG, "Failed to subscribe to topic: %s", topic);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "Subscribed to topic: %s (msg_id: %d)", topic, msg_id);
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
esp_err_t mqtt_client_unsubscribe(const char* topic)
|
||||
{
|
||||
if (s_mqtt_client == NULL || s_mqtt_state != MQTT_STATE_CONNECTED) {
|
||||
ESP_LOGW(TAG, "MQTT client not connected");
|
||||
return ESP_ERR_INVALID_STATE;
|
||||
}
|
||||
|
||||
if (topic == NULL) {
|
||||
return ESP_ERR_INVALID_ARG;
|
||||
}
|
||||
|
||||
int msg_id = esp_mqtt_client_unsubscribe(s_mqtt_client, topic);
|
||||
if (msg_id < 0) {
|
||||
ESP_LOGE(TAG, "Failed to unsubscribe from topic: %s", topic);
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "Unsubscribed from topic: %s (msg_id: %d)", topic, msg_id);
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
bool mqtt_client_is_connected(void)
|
||||
{
|
||||
return s_mqtt_state == MQTT_STATE_CONNECTED;
|
||||
}
|
||||
|
||||
mqtt_state_t mqtt_client_get_state(void)
|
||||
{
|
||||
return s_mqtt_state;
|
||||
}
|
||||
|
||||
void mqtt_client_register_callbacks(mqtt_connected_callback_t on_connected,
|
||||
mqtt_disconnected_callback_t on_disconnected,
|
||||
mqtt_data_callback_t on_data)
|
||||
{
|
||||
s_connected_callback = on_connected;
|
||||
s_disconnected_callback = on_disconnected;
|
||||
s_data_callback = on_data;
|
||||
}
|
||||
|
||||
// Utility functions
|
||||
esp_err_t mqtt_publish_status(const char* status)
|
||||
{
|
||||
return mqtt_client_publish(TOPIC_STATUS, status, MQTT_QOS_1, MQTT_RETAIN);
|
||||
}
|
||||
|
||||
esp_err_t mqtt_publish_moisture(int sensor_id, int value)
|
||||
{
|
||||
char topic[64];
|
||||
char data[32];
|
||||
|
||||
snprintf(topic, sizeof(topic), "plant_watering/moisture/%d", sensor_id);
|
||||
snprintf(data, sizeof(data), "%d", value);
|
||||
|
||||
return mqtt_client_publish(topic, data, MQTT_QOS_0, MQTT_NO_RETAIN);
|
||||
}
|
||||
|
||||
esp_err_t mqtt_publish_pump_state(int pump_id, bool state)
|
||||
{
|
||||
char topic[64];
|
||||
const char* state_str = state ? "on" : "off";
|
||||
|
||||
snprintf(topic, sizeof(topic), "plant_watering/pump/%d/state", pump_id);
|
||||
|
||||
return mqtt_client_publish(topic, state_str, MQTT_QOS_1, MQTT_RETAIN);
|
||||
}
|
||||
|
||||
// Event handler
|
||||
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
|
||||
{
|
||||
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%ld", base, event_id);
|
||||
mqtt_event_handler_cb(event_data);
|
||||
}
|
||||
|
||||
static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
|
||||
{
|
||||
switch (event->event_id) {
|
||||
case MQTT_EVENT_CONNECTED:
|
||||
ESP_LOGI(TAG, "MQTT connected");
|
||||
s_mqtt_state = MQTT_STATE_CONNECTED;
|
||||
|
||||
// Publish online status
|
||||
mqtt_publish_status(STATUS_ONLINE);
|
||||
|
||||
// Subscribe to command topics
|
||||
mqtt_client_subscribe(TOPIC_PUMP_1_CMD, MQTT_QOS_1);
|
||||
mqtt_client_subscribe(TOPIC_PUMP_2_CMD, MQTT_QOS_1);
|
||||
mqtt_client_subscribe(TOPIC_CONFIG, MQTT_QOS_1);
|
||||
|
||||
// Call user callback
|
||||
if (s_connected_callback) {
|
||||
s_connected_callback();
|
||||
}
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_DISCONNECTED:
|
||||
ESP_LOGW(TAG, "MQTT disconnected");
|
||||
s_mqtt_state = MQTT_STATE_DISCONNECTED;
|
||||
|
||||
// Call user callback
|
||||
if (s_disconnected_callback) {
|
||||
s_disconnected_callback();
|
||||
}
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_SUBSCRIBED:
|
||||
ESP_LOGI(TAG, "Subscribed to topic, msg_id=%d", event->msg_id);
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_UNSUBSCRIBED:
|
||||
ESP_LOGI(TAG, "Unsubscribed from topic, msg_id=%d", event->msg_id);
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_PUBLISHED:
|
||||
ESP_LOGD(TAG, "Message published, msg_id=%d", event->msg_id);
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_DATA:
|
||||
ESP_LOGI(TAG, "MQTT data received");
|
||||
ESP_LOGI(TAG, "Topic: %.*s", event->topic_len, event->topic);
|
||||
ESP_LOGI(TAG, "Data: %.*s", event->data_len, event->data);
|
||||
|
||||
// Call user callback
|
||||
if (s_data_callback) {
|
||||
// Null-terminate the strings for easier handling
|
||||
char topic[256] = {0};
|
||||
char data[512] = {0};
|
||||
|
||||
int topic_len = event->topic_len < sizeof(topic) - 1 ? event->topic_len : sizeof(topic) - 1;
|
||||
int data_len = event->data_len < sizeof(data) - 1 ? event->data_len : sizeof(data) - 1;
|
||||
|
||||
memcpy(topic, event->topic, topic_len);
|
||||
memcpy(data, event->data, data_len);
|
||||
|
||||
s_data_callback(topic, data, data_len);
|
||||
}
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_ERROR:
|
||||
ESP_LOGE(TAG, "MQTT error");
|
||||
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
|
||||
ESP_LOGE(TAG, "Last error code reported from esp-tls: 0x%x", event->error_handle->esp_tls_last_esp_err);
|
||||
ESP_LOGE(TAG, "Last tls stack error number: 0x%x", event->error_handle->esp_tls_stack_err);
|
||||
ESP_LOGE(TAG, "Last captured errno : %d (%s)", event->error_handle->esp_transport_sock_errno,
|
||||
strerror(event->error_handle->esp_transport_sock_errno));
|
||||
} else if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
|
||||
ESP_LOGE(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code);
|
||||
} else {
|
||||
ESP_LOGW(TAG, "Unknown error type: 0x%x", event->error_handle->error_type);
|
||||
}
|
||||
s_mqtt_state = MQTT_STATE_ERROR;
|
||||
break;
|
||||
|
||||
case MQTT_EVENT_BEFORE_CONNECT:
|
||||
ESP_LOGI(TAG, "MQTT client connecting...");
|
||||
s_mqtt_state = MQTT_STATE_CONNECTING;
|
||||
break;
|
||||
|
||||
default:
|
||||
ESP_LOGD(TAG, "Other MQTT event id: %d", event->event_id);
|
||||
break;
|
||||
}
|
||||
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
// Configuration management functions
|
||||
esp_err_t mqtt_client_set_broker_url(const char* url)
|
||||
{
|
||||
nvs_handle_t nvs_handle;
|
||||
esp_err_t ret;
|
||||
|
||||
ret = nvs_open(MQTT_NVS_NAMESPACE, NVS_READWRITE, &nvs_handle);
|
||||
if (ret != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to open NVS namespace");
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_set_str(nvs_handle, "broker_url", url);
|
||||
if (ret != ESP_OK) {
|
||||
nvs_close(nvs_handle);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_commit(nvs_handle);
|
||||
nvs_close(nvs_handle);
|
||||
|
||||
if (ret == ESP_OK) {
|
||||
ESP_LOGI(TAG, "MQTT broker URL saved to NVS");
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
esp_err_t mqtt_client_set_credentials(const char* username, const char* password)
|
||||
{
|
||||
nvs_handle_t nvs_handle;
|
||||
esp_err_t ret;
|
||||
|
||||
ret = nvs_open(MQTT_NVS_NAMESPACE, NVS_READWRITE, &nvs_handle);
|
||||
if (ret != ESP_OK) {
|
||||
ESP_LOGE(TAG, "Failed to open NVS namespace");
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_set_str(nvs_handle, "username", username);
|
||||
if (ret != ESP_OK) {
|
||||
nvs_close(nvs_handle);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_set_str(nvs_handle, "password", password);
|
||||
if (ret != ESP_OK) {
|
||||
nvs_close(nvs_handle);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_commit(nvs_handle);
|
||||
nvs_close(nvs_handle);
|
||||
|
||||
if (ret == ESP_OK) {
|
||||
ESP_LOGI(TAG, "MQTT credentials saved to NVS");
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
esp_err_t mqtt_client_get_config(char* url, size_t url_len,
|
||||
char* username, size_t username_len,
|
||||
char* password, size_t password_len)
|
||||
{
|
||||
nvs_handle_t nvs_handle;
|
||||
esp_err_t ret;
|
||||
|
||||
ret = nvs_open(MQTT_NVS_NAMESPACE, NVS_READONLY, &nvs_handle);
|
||||
if (ret != ESP_OK) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_get_str(nvs_handle, "broker_url", url, &url_len);
|
||||
if (ret != ESP_OK) {
|
||||
nvs_close(nvs_handle);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_get_str(nvs_handle, "username", username, &username_len);
|
||||
if (ret != ESP_OK) {
|
||||
nvs_close(nvs_handle);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = nvs_get_str(nvs_handle, "password", password, &password_len);
|
||||
nvs_close(nvs_handle);
|
||||
|
||||
return ret;
|
||||
}
|
||||
Reference in New Issue
Block a user