MQTT协议与EMQX

1. MQTT协议介绍

MQTT是基于publish/subscribe 模式的物联网通信协议,凭借简单容易实现,支持QoS、报文小等特点,占据物联网协议的半壁江山

mqtt官网:https://mqtt.org/

mqtt中文网:https://mqtt.p2hp.com/

1.1 MQTT简介

MQTT是基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它使用范围非常广泛,在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

1.2 MQTT协议设计规范

由于物联网的环境是非常特别的,所以MQTT遵循一下设计原则:

  1. 精简,不添加可有可无的功能;
  2. 发布/订阅(pub/sub)模式,方便消息在传感器之间传递,解耦client/server模式,带来的好处在于不必预先知道对方的存在(ip/port),不必同时运行;
  3. 允许用户动态创建主题(不需要预先创建主题),零运营成本;
  4. 把传输量降到最低以提高传输效率;
  5. 把低带宽、高延迟、不稳定的网络等因素考虑在内;
  6. 支持连续的会话保持和控制(心跳);
  7. 理解客户端计算能力可能很低;
  8. 提供服务质量(quality of service level:QoS)管理;
  9. 不强求传输数据的类型与格式,保持灵活性(指的是应用层业务数据);

1.3 MQTT协议主要特性

MQTT协议工作在低带宽,不可靠的网络在禹城传感器和控制设备通讯而设计的协议,它具有一下主要的几项特性:

  1. 开放消息协议,简单易实现。
  2. 使用发布/订阅消息模式,提供一对多的消息发布,解出应用程序耦合。
  3. 对负载(协议携带的应用数据)内容屏蔽的消息传输。
  4. 基于TCP/IP网络连接,提供有序,无损,双向连接。

主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

5. 消息服务质量(QoS)支持,可靠传输保证;有三种消息发布服务质量;

QoS0:“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

QoS1:“至少一次”,确保消息到达,但消息重复可能会发生。

QoS2:“只有一次”,确保消息到达一次,在一些要求比较严格的计费系统中,可以使用此级别,在计费系统中,消息重复或丢失会导致不正确的结果。这种高质量的消息发布服务还可以用于即时通讯的APP的推送,确保用户收到且只会收到一次。

  1. 1直接固定表头,2直接心跳报文,最小化传输开销和协议交换,有效减少网络流量。

这就是为什么介绍里说它非常适合在物联网领域,传感器与服务器的通讯,消息的收集,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传输消息再合适不过了。

  1. 在线状态感知:使用Last Will 和 Testament 特性通知有关个方客户端异常中断的机制。

Last Will:即遗言机制,用于通知同一主题下的其他设备,发送遗言的设备已经断开了连接。

Testament:遗嘱机制,功能类似于Last Will。

1.4 MQTT协议应用领域

MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。

  • 物联网M2M通信,物联网大数据采集
  • Android消息推送,WEB消息推送
  • 移动即时消息,例如Facebook Messenger
  • 智能硬件、智能家具、智能电器
  • 车联网通信,电动车站桩采集
  • 智慧城市、远程医疗、远程教育
  • 电力、石油与能源等行业市场

2. MQTT协议原理

2.1 MQTT协议实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(publish)、代理(broker)、订阅者(subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息氛围:主题(topic)和负载(payload)两部分:

  1. Topic:可以理解为消息的类型,订阅者订阅(subscribe)后,就会收到该主题的消息内容(payload)。
  2. payload:可以理解为消息的内容,是指订阅者具体要使用的内容。

2.2 网络传输与应用消息

MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。

当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关联。

2.3 MQTT客户端

一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

  1. 发布其他客户端可能会订阅的消息;
  2. 订阅其他客户端发布的消息;
  3. 退订或删除应用程序的消息;
  4. 断卡与服务器的连接;

2.4 MQTT服务器端

MQTT服务器以称为消息代理(broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

  1. 接受来自客户的网络连接;
  2. 接受客户发布的应用消息;
  3. 处理来自客户端的订阅和退订请求;
  4. 向订阅的客户转发应用程序消息。

2.5 发布/订阅、主题、会话

MQTT是基于**发布(publish)/订阅(subscribe)模式来进行通信以及数据交换的,与HTTP的请求(request)/应答(response)**的模式有本质的不同。

订阅者(subscriber)会向消息服务器(broker)订阅一个主题(topic)。成功订阅后,消息服务器会将该主题下的消息转发给所有的订阅者。

主题(topic)以/未分隔符区分不同的层级。包含通配符+#的主题有称为主题过滤器(topic filter);不含通配符的称为主题名(topic Names),例如:

chat/room/1

sensor/10/temperature

sensor/+/temperature

$SYS/broker/metrice/packets/received

$SYS/broker/metrics/#
'+':表示通配一个层级,例如a/+,通配a/x, a/y

'#':表示通配多个层级,例如a/#,通配a/x, a/b/c/d

注意:'+' 通配一个层级,'#' 通配多个层级(必须在末尾)

发布者(publisher)只能向主题名发布消息,订阅者(subscriber)则可以通过订阅主题过滤器来通配多个主体名称。

会话(session)

每个客户端与服务器建立连接后就是一个会话,客户端与服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

2.6 MQTT协议中的方法

MQTT协议中定义了一些方法(也被称为动作),用于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成的数据,这取决于服务器的实现,通常来说,资源指服务器上的文件或输出,主要方法有:

  1. connect:客户端连接到服务器
  2. connack:连接确认
  3. publish:发布消息
  4. puback:发布确认
  5. pubrec:发布的消息已接收
  6. pubrel:发布的消息已释放
  7. pubcomp:发布完成
  8. subscribe:订阅请求
  9. suback:订阅确认
  10. unsubscribe:取消订阅
  11. unsuback:取消订阅确认
  12. pingreq:客户端发送心跳
  13. qingresp:服务端心跳响应
  14. disconnect:断开连接
  15. auth:认证

3. MQTT协议数据包结构

官方文档中对于MQTT协议包的结构有着具体的说明:https://mqtt.org/mqtt-specification/

在MQTT协议中,一个MQTT数据包由:固定头(fixed header)、可变头(variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

  1. 固定头(fixed header)。存在于所有MQTT数据包中,表示数据包类型以及数据包的分组类标识,如连接,发布,订阅,心跳等。其中固定头是必须得,所有类型的MQTT协议中,都必须包含固定头。
  2. 可变头(variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在以及其中具体内容。可变头部是不可选的意思,而是指这部分在有些协议类型中存在,在有些协议中不存在。
  3. 消息体(payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。与可变头一样,在一些协议类型中有消息内容,有些协议类型中没有消息内容。

3.1 固定头(fixed header)

image-20230816145112927

固定头存在于所有MQTT数据包中,固定头包含两部分内容,首字节(字节1)和剩余消息报文长度(从第二个字节开始,长度为1-4字节)剩余长度是当前包中剩余内容长度的字节数,包括变量头和有效负载中的数据)。剩余长度不包含用来编码剩余长度的字节。

剩余长度使用了一种可变长度的结构来编码,这种接口使用单一字节表示0-127的值。大约127的值如下处理。每个字节的低7为用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识为。剩余长度最多可以用四个字节来表示。

数据包类型

位置:第一个字节(byte 1)中的7-4个bit位(bit[7-4]),表示4位无符号值。

通过第一个字节的高4位确定消息报文的类型,4个bit位能确定16中类型,其中0000和1111是保留字段。

报文类型 字段值 数据方向 描述 7-4bit值
保留 0 禁用 保留 0000
CONNECT 1 Client—>Server 客户端连接到服务器 0001
CONNACK 2 Server—>Client 连接确认 0010
PUBLISH 3 Client<—>Server 发布消息 0011
PUBACK 4 Client<—>Server 发布确认(QoS1) 0100
PUBREC 5 Client<—>Server 消息已接收(Qos2第一阶段) 0101
PUBREL 6 Client<—>Server 消息释放(QoS2第二阶段) 0110
PUBCOMP 7 Client<—>Server 发布结束(QoS2第三阶段) 0111
SUBSCRIBE 8 Client—>Server 客户端订阅请求 1000
SUBACK 9 Server—>Client 服务端订阅确认 1001
UNSUBACRIBE 10 Client—>Server 客户端取消订阅 1010
UNSUBACK 11 Server—>Client 服务端取消订阅确认 1011
PINGREQ 12 Client—>Server 客户端发送心跳 1100
PINGRESP 13 Server—>Client 服务端回复心跳 1101
DISCONNECT 14 Client—>Server 客户端 1110
AUTH 15 Client<—>Server 认证数据交换 1111

标志位

位置:第一个字节中的0-3个bit位(bit[0-3])用作报文的标识

首字节的低4位(bit3-bit0)用来表示某些报文类型的控制字段,实际上只有少数报文类型有控制位,如下:

控制报文 固定报头标志 Bit 3 Bit 2 Bit 1 Bit 0
CONNECT 保留 0 0 0 0
CONNACK 保留 0 0 0 0
PUBLISH Used in MQTT v5.0 DUP QoS QoS RETAIN
PUBACK 保留 0 0 0 0
PUBREC 保留 0 0 0 0
PUBREL 保留 0 0 1 0
PUBCOMP 保留 0 0 0 0
SUBSCRIBE 保留 0 0 1 0
SUBACK 保留 0 0 0 0
UNSUBSCRIBE 保留 0 0 1 0
UNSUBACK 保留 0 0 0 0
PINGREQ 保留 0 0 0 0
PINGRESP 保留 0 0 0 0
DISCONNECT 保留 0 0 0 0
AUTH 保留 0 0 0 0
  1. 其中bit3为DUP字段,如果该值为1,表明这个数据包是一条重复的数据;否则该数据包就是第一次发布的消息。
  2. bit2-1为QoS字段;

如果bit 1和bit 2都为0,表示QoS 0:至多一次

如果bit 1为1,表示QoS 1:至少一次

如果bit 2为1,表示QoS 2:只有一次

如果同时将bit 1和bit 2都设置成1,那么客户端或服务器认为这是一条非法的消息,会关闭当前的连接。

对于QoS为0的消息,DUP标志必须设置为0

目前bit [3-0]只在PUBLISH协议中使用有效,并且表中指明了是MQTT5.0版本,对于其他MQTT协议版本,内容可能不同。所有

  • QoS 0消息发布订阅

    image-20230816181556443

  • QoS 1消息发布订阅

    image-20230816181645360

  • QoS 2消息发布订阅

    image-20230816181743738

3.2 可变头(Variable Header)

可变头的意思是可变化的消息头部。有些报文类型包含可变头部有些报文则不包含。可变头部在固定头部和消息内容之间,其内容根据报文类型不同而不同。

image-20230816182927664

协议名

协议名是表示协议名MQTT的UTF-8编码的字符串。MQTT规范的后续版本不会改变这个字符串的偏移和长度。

协议名 说明 7 6 5 4 3 2 1 0
byte 1 长度MSB (0) 0 0 0 0 0 0 0 0
byte 2 长度LSB (4) 0 0 0 0 0 1 0 0
byte 3 ‘M’ 0 1 0 0 1 1 0 1
byte 4 ‘Q’ 0 1 0 1 0 0 0 1
byte 5 ‘T’ 0 1 0 1 0 1 0 0
byte 6 ‘T’ 0 1 0 1 0 1 0 0

支持多种协议的服务端使用协议名字段判断数据是否为MQTT报文。协议名必须是UTF-8字符串“MQTT”。如果服务端不愿意接受CONNECT但希望表明其MQTT服务端身份,可以发送包含原因码为0x84(不支持的协议版本)的CONNACK报文,然后必须关闭网络连接。

协议版本

位无符号值表示客户端的版本等级3.1.1版本的协议等级是4,MQTT v5.0的协议版本字段位5(0x05)

MQTT会话(Clean Session)

MQTT客户端向服务器发起CONNECT请求时,可以通过Clean Session标志设置会话。

Clean Session设置为0,表示创建爱你一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,知道会话超时注销。

Clean Session设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

3.3 消息体(Payload)

有些报文类型是包含payload的,payload的意思是消息载体的意思。

如publish的payload就是指消息内容(应用程序发布的消息内容)。而connect的payload则包含client identifier,will topic,will message,username,password等消息。

包含payload的报文类型如下

报文类型 是否需要payload
CONNECT Required
CONNACK None
PUBLISH Optional
PUBACK None
PUBREC None
PUBREL None
PUBCOMP None
SUBSCRIBE Required
SUBACK Required
UNSUBSCRIBE Required
UNSUBACK None
PINGREQ None
PINGRESP None
DISCONNECT None

4. EMQX简介

MQTT属于是物联网的通讯协议,在MQTT协议中有俩大角色:客户端(发布者/订阅者),服务端(MQTT broker);针对客户端和服务端需要有遵循该协议的具体实现,EMQ/EMQ X就是MQTT broker的一种实现。

EMQ官网:https://www.emqx.com/zh

4.1 EMQ X是什么

EMQ X基于Erlang/OTP平台开发的MQTT消息服务器,是开源社区中最流行的MQTT消息服务器。

EMQ X是开源百万级分布式MQTT消息服务器(MQTT Messaging Broker),用于支持各种接入标准MQTT协议的设备,实现从设备端到服务器端的消息传递,以及从服务器端到设备端的设备控制消息转发。从而实现物联网设备的数据采集,和对设备的操作和控制。

4.2 为什么选择EMQ X

到目前为止,比较流行的MQTT Broker 有几个:

  1. Eclipse Moquitto:https://github.com/eclopse/mosquitto

    使用C 语言实现的MQTT Broker。Eclipse 组织还包含了大量的MQTT客户端项目:https://www.eclipse.org/paho/#

  2. EMQX:https://gihub.com/emqx/emqx

    使用Erlang 语言开发的MQTT Broker,支持许多其他lot 协议比如Coap、LwM2M 等

  3. Mosca:https://github.com/mcollina/mosca

    使用Node.js 开发的MQTT Broker,简单易用。

  4. VerneMQ:https://github.com/vernemq/vernemq

    同样使用Erlang 开发的MQTT Broker

从支持MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX的表现应该是最好的。

与别的MQTT服务器相比EMQ X主要有一下的特点:

  • 经过100+版本的迭代,EMQ X 目前为开源社区中最流行的MQTT消息中间件,在各种客户端严格的生产环境上经受了严苛的考研;
  • EMQ X 支持丰富的物联网协议,包括MQTT、MQTT-SN、CoAP、LwM2M、LoRaWan 和 WebWocket等;
  • 优化的架构设计,支持超大规模的设备连接。企业版单机能支持百万的MQTT连接;集群能支持千万级别的 MQTT连接;
  • 易于安装使用;
  • 灵活的扩展性,支持企业的一些定制场景;
  • 中国本地的技术支持服务,通过微信、QQ等线上渠道快速响应客户需求;
  • 基于Apache 2.0 协议许可,完全卡原。EMQ X 的代码都放在GitHub中,用户可以查看所有源代码。
  • EMQ X 3.0支持 MQTT 5.0 协议,是开源社区中第一个支持5.0协议规范的消息服务器,并且完全兼容 MQTT 3.1 和 3.1.1 协议。除了MQTT协议之外,EMQ X 还支持别的一些物联网协议。
  • 单机支持百万连接,集群支持千万级连接;毫秒级消息转发。EMQ X 中应用了多种技术来实现上述功能。
    • 利用 Erlang/OTP 平台的软实时、高并发和容错(电信领域久经考验的语言)
    • 全异步架构
    • 连接、会话、路由、集群的分层设计
    • 消息平面和控制平面的分离等
  • 扩展模块和插件,EMQ X 提供了领过的扩展机制,可以实现私有协议、认证鉴权、数据持久化、桥接转发和管理控制台等的扩展
  • 桥接:EMQ X 可以跟别的消息系统进行对接,比如 EMQ X Enterprise 版本中可以支持将消息转发到 Kafka、RabbitMQ 或者别的EMQ节点等
  • 共享订阅:共享订阅支持通过负载均衡的方式在多个订阅者之间来分发MQTT消息。比如针对物联网等数据采集场景,会有比较多的设备在发送数据,通过共享订阅的方式可以在订阅端设置多个订阅者来实现这几个订阅者之间的工作负载均衡

4.3 EMQ X 与物联网平台的关系是什么

典型的物联网平台包括设备硬件、数据采集、数据存储、分析、web/移动应用等。EMQ X 位于数据采集这一层,分别与硬件和数据存储、分析进行交互,是物联网平台的核心:前端你的硬件通过MQTT协议与位于数据采集层的EMQ X 交互,通过 EMQ X 将数据采集后,通过EMQ X 提供的数据接口,将数据保存到后台的持久化平台中(各种关系型数据库和NOSQL数据库),或者流失数据处理框架等,上层应用通过这些数据分析后得到的结果呈现给最终用户。

4.4 EMQ X 有哪些产品

EMQ X 公司主要提供三个产品,可在官网首页产品导航查看每一种产品;主要体现在支持的连接数量、产品功能和商业服务等方法面的区别;

  • EMQ X Broker:EMQ X 开源版,完全支持 MQTT V3.1.1/V5.0 协议规范,完整支持 TCP 、TLS、WebSocket 连接,支持百万级连接和分布式集群架构;LDAP,MySQL,Redis,MongoDB 等扩展插件集成,支持插件模式扩展服务器功能;支持跨Linux、Windows,macOS 平台安装,支持公有云,私有云,k8s/容器部署。
  • EMQ X Enterprise:EMQ X 企业版,在开源版本基础上,支持物联网主流协议 MQTT、MQTT-SN、CoAP/LwM2M、HTTP、WebSocket 一站式设备接入;JT-808/GBT-32960 等行业协议支持,基于TCP/UDP 私有协议的旧网设备接入兼容,多重安全机制与认证鉴权;高并发软实时消息路由;强大灵活的内置规则引擎;企业服务与应用集成;多种数据库持久化支持;消息变换桥接转发kafka;管理监控中心
  • EMQ X Platform:EMQ X 平台版,EMQ X Platform 是面向千万级超大型loT网络和应用,全国首选电信级物联网终端接入解决方案。千万级大容量;多物联网协议;电信级高可靠;卓越5G网络支持;跨云跨IDC部署;兼容历史系统;完善的咨询服务(从咨询到运维)

4.5 EMQ X 消息服务器功能列表

  • 完整的MQTT V3.1/V3.1.1 及 V5.0 协议规范支持
    • QoS0,QoS1,QoS2 消息支持
    • 持久会话与离线消息支持
    • Retained 消息支持
    • Last Will 消息支持
  • TCP/SSL 连接支持
  • MQTT/WebSocket/SSL 支持
  • HTTP 消息发布接口支持
  • $SYS/# 系统主题支持
  • 客户端在线状态查询与订阅支持
  • 客户端 ID 或 IP 地址认证支持
  • 用户名密码认证支持
  • LADP 认证
  • Redis、MySQL、postgresql、MongoDB、HTTP 认证集成
  • 浏览器cookie 认证
  • 基于客户端 ID、IP 地址、用户名的访问控制(ACL)
  • 多服务器节点集群(Cluster)
  • 支持manual、mcast、dns、etcd、k8s 等多种集群发现方式
  • 网络分区自动愈合
  • 消息速率限制
  • 连接速率限制
  • 按分区配置节点
  • 多服务器节点桥接(bridge)
  • MQTT broker 桥接支持
  • Stomp 协议支持
  • MQTT-SN 协议支持
  • CoAP 协议支持
  • Stomp/SockJS 支持
  • 延时 Publish($delay/topic)
  • Flapping 检测
  • 黑名单支持
  • 共享订阅($share/:group/topic)
  • TLS/PSK 支持
  • 规则引擎
    • 空动作(调试)
    • 消息重新发布
    • 桥接数据到MQTT broker
    • 检查(调试)
    • 发送数据到web服务

4.6 EMQ X 服务端环境搭建与配置

4.6.1 安装

EMQ X 目前支持绝大多数的平台

官方下载页面

产品部署建议Linux 服务器,不推荐Windows服务器

安装方式较多,源码编译、压缩包、容器化部署

参考centos7 安装docker

创建挂载路径

mkdir -p /home/emqx/{data,log}
cd /home/emqx
chmod 777 data
chmod 777 log

获取docker 镜像

docker pull emqx/emqx:5.1.4

启动docker容器

docker run -d --name emqx \
-p 1883:1883 \
-p 8083:8083 \
-p 8084:8084 \
-p 8883:8883 \
-p 18083:18083 \
-v /home/emqx/data:/opt/emqx/data \
-v /home/emqx/log:/opt/emqx/log \
emqx/emqx:5.1.4

访问运行环境ip:18083 ,默认账号/密码:admin/public