大家好,这篇文章给大家介绍MQTT协议以及如何在OpenWrt系统中使用MQTT客户端和开发,并给出相关实例代码。

MQTT简介

MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,设计用于在低带宽和不稳定的网络环境中传输消息。它最初由IBM开发,用于连接远程设备和传感器到网络,并支持发布/订阅模型的消息通信。

MQTT被广泛用于物联网(IoT)领域,其中大量的设备需要进行实时通信和数据交换。它采用了一种发布/订阅(publish/subscribe)模型,其中消息的发送者(发布者)将消息发布到特定的主题(topic),而订阅者可以选择性地订阅感兴趣的主题,以接收相应的消息。

MQTT特点

以下是MQTT的一些关键特点:

轻量级:MQTT的设计非常轻量,协议头部非常小,传输的数据量很小,适用于带宽有限的网络环境,如低速、高延迟或不稳定的网络。

简单:MQTT的协议规范相对简单,易于实现和部署。它定义了少量的消息类型和协议操作,使得开发人员可以快速上手。

异步通信:MQTT使用异步通信模式,发布者发送消息后,不需要等待接收者的响应,可以继续执行其他操作。这种异步通信模式适合在资源有限的设备和网络中工作。

可靠性:MQTT支持三种不同的消息传递质量(QoS)级别:QoS 0(至多一次),QoS 1(至少一次)和QoS 2(只有一次)。这使得可以根据应用程序的要求选择适当的消息交付保证级别。

网络状况适应性:MQTT可以适应不稳定的网络状况,如网络中断、重连等。它具有断开连接后自动重连的机制,可以确保消息的可靠传输。

订阅和发布模型

Publisher(发布者):发布者是消息的发送者,它将消息发布到特定的主题(topic)上。可以有一个或多个发布者。

Subscriber(订阅者):订阅者是对消息感兴趣的实体,它选择性地订阅一个或多个主题。一旦订阅了主题,它就会接收到相应的消息。

MQTT Broker(MQTT代理):MQTT代理是中间件,负责接收发布者发送的消息,并将其路由到对应的订阅者。它维护着主题和订阅关系的注册表,并确保消息的可靠传递。

工作流程如下:

发布者将消息发布到特定的主题上。

MQTT代理接收到消息后,根据订阅者的注册信息,将消息路由到对应的订阅者。

订阅者接收到发布者发布的消息,并进行相应的处理。

通过发布/订阅模型,MQTT允许实现解耦和灵活性,发布者和订阅者之间不需要直接的点对点连接,而是通过MQTT代理进行中转和路由。这种模型非常适合在物联网中进行大规模设备间的通信和数据交换。

MQTT QoS

MQTT(Message Queuing Telemetry Transport)协议支持三种不同的QoS(Quality of Service)级别,用于控制消息的可靠性和传输保证。以下是MQTT的三个QoS级别:

QoS 0(至多一次):

在QoS 0级别下,消息以“至多一次”传输,没有确认机制。 消息被发布后,发布者不会接收到关于消息是否成功传输或交付的确认。 MQTT代理会尽最大努力将消息传输给订阅者,但可能会出现消息丢失或重复的情况。 此级别适用于对消息传输的可靠性要求不高的场景,如传感器数据的临时更新等。

QoS 1(至少一次):

在QoS 1级别下,消息以“至少一次”传输,确保至少传输一次。 发布者发送消息后,会等待MQTT代理发送确认消息(PUBACK)来确认消息的接收。 如果发布者没有收到确认消息,它会再次发送相同的消息,直到收到确认为止。 MQTT代理会确保消息至少传输一次给订阅者,但可能会出现重复传输的情况。 此级别适用于对消息传输的可靠性要求较高的场景,如控制指令的传递。

QoS 2(只有一次):

在QoS 2级别下,消息以“只有一次”传输,确保仅传输一次。 发布者发送消息后,会等待MQTT代理发送两个确认消息(PUBREC和PUBCOMP)来确认消息的接收和完成。 MQTT代理会确保消息仅传输一次给订阅者,没有重复传输的情况。 此级别提供了最高的消息传输可靠性,但也伴随着更高的网络开销。 此级别适用于对消息传输的可靠性要求非常高的场景,如金融交易或严格的数据同步。 选择合适的QoS级别取决于应用程序对消息传输可靠性和网络开销的要求。更高的QoS级别提供了更可靠的传输,但同时也增加了网络开销。因此,需要根据具体场景的需求来选择适当的级别。

OpenWrt中使用mosquitto

插件安装

默认是没有包含mosquitto客户端和broker的,我们可以手动安装相关插件,为了测试我们需要安装broker和client
首先更新openwrt软件源

opkg update

然后调用以下命令分别安装mosquitto broker和client,这里我们选用nossl版本,也就是不需要ssl加密,方便测试

opkg install mosquitto-nossl
opkg install mosquitto-client-nossl

mosquitto服务

安装完成后就可以使用broker和client了,首先我们需要启动mosquitto broker服务,

mosquitto broker服务配置文件在/etc/mosquitto/目录中,我们可以修改服务器相关信息,比如监听端口号、接口、ip地址等。

root@OpenWrt:~# ls /etc/mosquitto/mosquitto.conf 
/etc/mosquitto/mosquitto.conf
root@OpenWrt:~# 

mosquitto客户端

mosquitto客户端包含sub和pub两部分,分别用于订阅和发布

订阅主题:

mosquitto_sub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic>

其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要订阅的主题名称。

示例:

mosquitto_sub -h 192.168.1.1 -p 1883 -t test/topic

发布主题:

mosquitto_pub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic> -m <Message>

其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要发布的主题名称,是要发布的消息内容。

示例:

mosquitto_pub -h 192.168.1.1 -p 1883 -t test/topic -m "Hello, MQTT!"

运行结果:
由于订阅和发布客户端都在本地,ip使用localhost地址127.0.0.1

root@OpenWrt:~# mosquitto_sub -h 127.0.0.1 -p 1883 -t test/topic &
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t test/topic -m "hello MQTT."
root@OpenWrt:~# hello MQTT.

在订阅主题时,我们还可以使用通配符,最常用就是通配符”#”,通过”#”可以匹配多级topic
比如订阅了主题”test/#”,则可以收到”test/”开头的所有topic,比如”test/topic1″、”test/hello”等
以下实例中分别订阅了”test/#”和”test/topic1″,当发布”test/topic1″消息时,二者都可以收到,而发布”test/topic2″时只有一个可以收到。

除了通配符”#”之外,还有”$”、”+”等通配符,不在这里详解。

使用云端公共broker测试
emqx提供了公共免费的broker供开发者测试,注意不要在生产环境使用,仅供测试
我们可以准备两台不同的设备,都连接broker.emqx.io,这两台设备可以在不同区域,通过公网broker可以轻松实现两台设备通信。

  • 客户端1
    客户端1订阅openwrt/topic消息
mosquitto_sub -h broker.emqx.io -p 1883 -i client_001  -t openwrt/topic
  • 客户端2
    发送一条消息到主题openwrt/topic,这样客户端1就可以收到该消息
mosquitto_pub -h broker.emqx.io -p 1883 -t openwrt/topic -i client_002 -m "hello client1, i am froms client2"  

OpenWrt中基于libmosquitto开发

前面给大家演示了mosquitto客户端的使用,但命令行客户端仅供测试使用,我们在开发过程中需要自定义消息并且能够实时解析消息,而通过命令行就没那么方便消息的处理了,需要调用mosquitto底层api接口实现想要的功能。

libmosquitto库

在openwrt系统中默认集成了mosquitto库,可以直接依赖调用。
对应依赖的库为:
libmosquitto-nossl 不支持ssl加密
libmosquitto 支持ssl加密

api接口详解

mosquitto_lib_init:初始化libmosquitto库。在使用其他libmosquitto函数之前,应该首先调用此函数。

mosquitto_lib_version:获取libmosquitto库的版本号信息。

mosquitto_new:创建一个新的mosquitto对象(MQTT客户端)。

mosquitto_connect:与MQTT代理服务器建立连接。

mosquitto_disconnect:断开与MQTT代理服务器的连接。

mosquitto_publish:向指定主题发布消息。

mosquitto_subscribe:订阅一个或多个主题。

mosquitto_unsubscribe:取消订阅一个或多个主题。

mosquitto_loop_start:启动一个线程来处理MQTT消息循环。

mosquitto_loop_forever:开始一个阻塞的循环,处理MQTT消息。

mosquitto_loop:在非阻塞模式下处理MQTT消息。

mosquitto_message_callback_set:设置用于接收订阅消息的回调函数。

mosquitto_username_pw_set:设置连接时使用的用户名和密码。

mosquitto_tls_set:为MQTT连接启用SSL/TLS加密。

mosquitto_tls_opts_set:设置SSL/TLS选项,如CA证书、客户端证书和私钥等。

mosquitto_tls_insecure_set:设置是否允许SSL/TLS连接中的不安全选项。

mosquitto_will_set:设置遗嘱消息,即在客户端异常断开时发布的消息。

基于libmosquitto实现一个消息订阅程序

源码

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
#include <sys/time.h>
#include <sys/sysinfo.h>

struct mosquitto *g_test_mosq = NULL;

void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
	printf("connect to mqtt server ok\n");
	if (MOSQ_ERR_SUCCESS != mosquitto_subscribe(mosq, NULL, "openwrt/#", 0))
	{
		printf("sub topic openwrt/# failed...\n");
	}
	else{
		printf("sub topic openwrt/# failed...\n");
	}
}

void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
{
	if (result)
		printf("disconnect %s\n", mosquitto_connack_string(result));
	else
		printf("disconnect from mqtt server.\n");
}

void mqtt_sub_callback(struct mosquitto *mosq,
						  void *userdata,
						  int mid,
						  int qos_count,
						  const int *granted_qos)
{
	printf("sub callback\n");
}

void mqtt_msg_callback(struct mosquitto *mosq,
						  void *userdata,
						  struct mosquitto_message *message)
{
	printf("callback recv mqtt msg, topic = %s, payload = %s\n", message->topic, message->payload);
}

struct mosquitto *connect_to_mqtt_server(char *server_ip)
{
	struct mosquitto *mosq = NULL;
	int rc;
	char mqtt_user[128] = {0};
	char mqtt_pwd[128] = {0};
	char client_id[128] = {0};
    struct timeval tv;
    gettimeofday(&tv, NULL);
	mosquitto_lib_init();
	snprintf(client_id, sizeof(client_id), "test_%d", tv.tv_sec);
	printf("connect to mqtt server..client_id=%s\n", client_id);
	mosq = mosquitto_new(client_id, true, NULL);

	if (!mosq)
	{
		return NULL;
	}
#if 0
	rc = mosquitto_username_pw_set(mosq, "test", "test");
	if (rc)
	{
		mosquitto_destroy(mosq);
		return NULL;
	}
#endif

	mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
	mosquitto_message_callback_set(mosq, mqtt_msg_callback);
	mosquitto_subscribe_callback_set(mosq, mqtt_sub_callback);
	mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback);
	rc = mosquitto_connect(mosq, server_ip, 1883, 30);
	if (rc)
	{
		printf("Unable to connect mqtt server rc=%d\n", rc);
		mosquitto_destroy(mosq);
		return NULL;
	}
	return mosq;
}

int mqtt_bcast_msg(char *api, char *data, int len)
{
	char topic[128] = {0};
	int mid;
	if (!api || !data || len == 0)
		return -1;
	if (!g_test_mosq)
		return -1;
	sprintf(topic, "openwrt/%s", api);
	return mosquitto_publish(g_test_mosq, &mid, topic, len, data, 0, 0);
}

int main(int argc, char *argv[]){
	char *host = NULL;
	if (argc < 2){
		host = "127.0.0.1";
		printf("use default ip: 127.0.0.1\n");
	}
	else{
		host = argv[1];
		printf("use ip: %s\n", host);
	}
	g_test_mosq = connect_to_mqtt_server(host);
	if (!g_test_mosq){
		printf("connect to server %s failed\n", host);
		exit(0);
	}
	mosquitto_loop_forever(g_test_mosq, -1, 1);
	mosquitto_destroy(g_test_mosq);
	mosquitto_lib_cleanup();
	return 0;
}

实例源码编译
将源码包拷贝到openwrt源码package目录
开启mqtt_test宏并生成默认依赖配置

echo "CONFIG_PACKAGE_mqtt_test=y" >>.config
make defconfig

编译

make package/mqtt_test/compile V=s

插件安装:
mqtt_test依赖了libmosquitto库,而libmosquitto依赖了libcares,所以需要安装三个插件

  • libcares
  • libmosquitto-nossl
  • mqtt_test

将插件通过winscp或其他工具上传到openwrt系统中,执行以下命令安装(以X86为例)

opkg install libcares_1.18.1-1_x86_64.ipk
opkg install libmosquitto-nossl_2.0.15-1_x86_64.ipk
opkg install mqtt_test_1.0-1_x86_64.ipk

运行:

mqtt_test默认连接本地broker,也可以指定ip

运行如果出现错误,表示服务器没有启动或者参数异常,请先确认mosquitto服务已经启动。

use default ip: 127.0.0.1
connect to mqtt server..client_id=test_1686993389
Unable to connect mqtt server rc=14
connect to server 127.0.0.1 failed

运行成功

root@OpenWrt:~# mqtt_test 
use default ip: 127.0.0.1
connect to mqtt server..client_id=test_1686993598
connect to mqtt server ok
sub callback

现在就启动了一个mqtt客户端,订阅了openwrt/#
通过mosquitto_pub工具可以发送指令到该客户端,客户端当前处理方式是输出收到的消息,当然实际开发是解析指令并执行对应的命令,比如接收到reboot命令后执行重启。

pub命令如下:

root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "hello openwrt"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"

客户端输出如下:

callback recv mqtt msg, topic = openwrt/send_msg, payload = hello openwrt
callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot
callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot

如果客户端连接云端的broker,就可以实现远程操作设备,比如远程重启设备、配置下发等。

总结

在物联网开发中我们会经常用的MQTT协议,常见的就是边缘设备和云端通信,上报实时状态、远程管理等,当然也可以局域网间通信,实现节点间通信,比如可以通过MQTT协议实现mesh数据同步、AC集中管理等,有了MQTT协议我们不需要自己通过底层socket实现私有协议,可以只关注业务处理,可大大提高程序稳定性。 当然MQTT也有一些缺陷,就是订阅发布模型实现实时回复消息比较麻烦,不适合做一些状态较复杂的交互。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。