概述随着物联网技术的零基快速发展,MQTT(Message Queuing Telemetry Transport)消息队列遥测传输协议,础教作为一种轻量级的自建通讯协议,被广泛应用于物联网设备之间的服务通讯。 MQTT 是器并一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。实现双MQTT最大优点在于,通讯可以以极少的零基代码和有限的带宽,为连接远程设备提供实时可靠的础教消息服务。  图片
 本次教程中,自建将探讨如何基于EMQX平台自建MQTT服务器,服务并实现设备之间的器并高效通讯。无论是实现双在工业控制、智能家居还是通讯智能城市等领域,搭建自己的零基MQTT服务器都能为我们带来更大的灵活性和可扩展性。让我们一起深入了解这个过程,为物联网应用打下坚实的基础。 MQTT通信的架构 图片
 上面架构图来自EMQX官网,其中中间绿色部分即我们要搭建的源码下载MQTT Broker,MQTT服务器搭建完成后,我们可以通过各种编程语言类库对服务器发起连接请求,以及主题发布和订阅。而编程语言类库中我们就可以使用workerman的mqtt扩展库与服务器进行通信。 MQTT概念Publisher(发布者):消息的发出者,负责发送消息。Subscriber(订阅者):消息的订阅者,负责接收并处理消息。Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。Payload(负载);可以理解为发送消息的内容。QoS(消息质量):全称 Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复; QoS 1(Atleast Once):至少一次,确保消息到达,免费源码下载但消息重复可能会发生; QoS 2(Exactly Once):只有一次,确保消息只到达一次。 EMQXEMQX平台作为一款开源的MQTT消息服务器,提供了稳定可靠的消息传输服务。本次教程中,我们将探讨如何基于EMQX平台自建MQTT服务器,并实现设备之间的高效通讯。无论是在工业控制、智能家居还是智能城市等领域,搭建自己的MQTT服务器都能为我们带来更大的灵活性和可扩展性。 EMQX 官网:https://www.emqx.io  图片
 安装 MQTTX 服务端本次教程中,我们将使用 Docker 部署,使用 Docker 指令直接部署,可以使用以下命令 获取 Docker 镜像复制docker pull emqx/emqx1.                                             图片
 启动 Docker 容器复制docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest1.                                            访问仪表板安装完成后,打开浏览器,并在地址栏中输入 http://localhost:18083以访问 EMQX 仪表板,您可以从那里连接到客户端或检查运行状态。  图片 复制默认用户名: admin                        默认密码:public1.2.                                            进入 EMQX 管理页面:  图片
 安装 MQTTX 客户端MQTTX 客户端我们选用workerman的mqtt扩展库与服务器进行通信。workerman/mqtt 是一个基于workerman的异步mqtt 客户端库,可用于接收或者发送mqtt协议的消息。支持QoS 0、QoS 1、网站模板QoS 2。支持MQTT、3.1、3.1.1、5版本。 安装复制composer require workerman/mqtt1.                                            订阅客户端subscribe.php 代码: 复制<?php                        /**                            * @desc Subscriber(订阅者):消息的订阅者,负责接收并处理消息。                            * @author Tinywan(ShaoBo Wan)                            * @date 2024/5/30 20:35                            */                        declare(strict_types=1);                        require_once __DIR__ . /../vendor/autoload.php;                        use Workerman\Worker;                        $worker = new Worker();                        $worker->onWorkerStart = function () {                        $options = [                        username => Tinywan,                        password => 123456,                        ];                        $mqtt = new Workerman\Mqtt\Client(mqtt://192.168.13.168:1883, $options);                        $mqtt->onConnect = function ($mqtt) {                        // 主题 Topic 可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。                        $topic = resty;                        $mqtt->subscribe($topic);                        };                        $mqtt->onMessage = function ($topic, $content) {                        echo [订阅者][收到主题]: . $topic . PHP_EOL;                        echo [订阅者][收到内容]: . $content . PHP_EOL;                        };                        $mqtt->connect();                        };                        Worker::runAll();1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.                                                                命令行运行 php subscribe.php start 启动:  图片
 启动成功后,即可看到设备已经成功连接到服务器。我们在 EMQX 服务器中的客户端页面中可以查看到设备的连接状态。  图片
 客户端发布接下来,我们测试发布和订阅主题是否正常。 publish.php 代码: 复制<?php                        /**                            * @desc Publisher(发布者):消息的发出者,负责发送消息。                            * @author Tinywan(ShaoBo Wan)                            * @date 2024/5/30 20:46                            */                        declare(strict_types=1);                        require_once __DIR__ . /../vendor/autoload.php;                        use Workerman\Worker;                        $worker = new Worker();                        $worker->onWorkerStart = function () {                        $mqtt = new Workerman\Mqtt\Client(mqtt://192.168.13.168:1883);                        $mqtt->onConnect = function ($mqtt) {                        // 主题 Topic                        $topic = resty;                        // 负载 Payload 可以理解为发送消息的内容                        $payload = Hello Tinywan mqtt;                        $mqtt->publish($topic, $payload);                        };                        $mqtt->connect();                        };                        Worker::runAll();1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.                                                                命令行运行 php publish.php start 发布消息。 以下是 订阅客户端 subscribe.php收到的消息:  图片
 至此,我们已经成功实现了 客户端 与 EMQX 服务器之间的通讯。 主题订阅 图片
 主题监控 图片
 Websocket 工具订阅消息 图片
 发布消息通过websocket发布消息:  图片
 MQTT客户端订阅消息:  
 使用 MQTT.js 库MQTT.js 是一个开源的 MQTT 协议的客户端库,使用 JavaScript 编写,主要用于 Node.js 和 浏览器环境中。是JavaScript 环境下的 MQTT 客户端库。可以用于微信小程序、支付宝小程序等定制浏览器环境。 可以直接在HTML文件中进行调用: 复制<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>1.                                            实现简单功能,EMQX websocket 连接服务地址 复制ws://localhost:8083/mqtt1.                                            MQTT.html 案例代码 复制<!DOCTYPE html>                        <html>                        <head>                        <meta charset="UTF-8">                        <title>开源技术小栈</title>                        </head>                        <body>                        <h3>零基础教你自建MQTT服务器并实现通讯</h3>                        <div style="height: 800px;">                        <label>【客户端】【目标Topic】:<input id="targetTopicInput" type="text"></label><br>                        <label>【客户端】【发送的消息】:<input id="messageInput" type="text"></label><br>                        <button notallow="sendMessage()">发送</button>                        <button notallow="clearMessage()">清空</button>                        <div id="messageDiv"></div>                        </div>                        </body>                        <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>                        <script>                        /**                            * EMQX websocket 连接地址                            * @type {string}                            */                        const url = ws://localhost:8083/mqtt;                        /**                            * 订阅的topic                            * @type {string}                            */                        const topic = resty;                        /**                            * 连接到消息队列                            */                        let client = mqtt.connect(url);                        client.on(connect, function () {                        /**                            * 连接成功后订阅topic                            */                        client.subscribe(topic, function (err) {                        if (!err) {                        showMessage("[订阅者][Topic主题]:" + topic + "成功!");                        }                        });                        });                        /**                            * 获取订阅topic中的消息                            */                        client.on(message, function (topic, message) {                        showMessage("[订阅者][收到消息]:" + message.toString());                        });                        /**                            * 发送消息                            */                        function sendMessage() {                        let targetTopic = document.getElementById("targetTopicInput").value;                        let message = document.getElementById("messageInput").value;                        // 向目标topic中发送消息                        client.publish(targetTopic, message);                        showMessage("[发送消息给]" + targetTopic + "的消息:" + message);                        }                        /**                            * 从URL中获取参数                            * @param name                            * @returns {null|string}                            */                        function getQueryString(name) {                        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");                        let r = window.location.search.substr(1).match(reg);                        if (r != null) {                        return decodeURIComponent(r[2]);                        }                        return null;                        }                        /**                            * 在消息列表中展示消息                            * @param message                            */                        function showMessage(message) {                        let messageDiv = document.getElementById("messageDiv");                        let messageEle = document.createElement("div");                        messageEle.innerText = message;                        messageDiv.appendChild(messageEle);                        }                        /**                            * 清空消息列表                            */                        function clearMessage() {                        let messageDiv = document.getElementById("messageDiv");                        messageDiv.innerHTML = "";                        }                        </script>                        </html>1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.                                                                发送消息:  图片
 接受消息:  图片
 小结根据官方文档我们知道publish和subscribe流程如何实现。对于subscribe由于需要实时获取来自硬件方面的数据或其他客户端的数据,因此subscribe需要以cli模式守护运行在系统后台。但是publish消息一般跟系统内的业务逻辑相关。可以通过MQTT客户端和Websocket客户端发送消息。  |