EMQX 是一款高度可扩展、高性能的开源 MQTT 消息代理(Message Broker),用于处理海量物联网设备的消息通信。它是基于 Erlang/OTP 语言开发的,具有卓越的并发处理能力和稳定性。EMQX 主要应用于物联网(IoT)领域,帮助设备通过 MQTT 协议进行消息发布和订阅,广泛用于智慧城市、智能制造、车联网、能源管理等领域。
本文将介绍如何通过 EMQX规则引擎, 将数据数据存储到 Datalayers 中。
向 Datalayers 中写入数据,需提前创建 database
与 table
,可使用 Datalayers 命令行工具 dlsql
来进行创建,在示例中将提前创建。
> create database demo
Query OK, 0 rows affected. (0.001 sec)
> use demo
Database changed to `demo`
> CREATE TABLE `sensor_info` (
time TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
temp DOUBLE,
timestamp key(time))
PARTITION BY HASH (sid) PARTITIONS 1
ENGINE=TimeSeries
Query OK, 0 rows affected. (0.034 sec)
集成
> 连接器
。Datalayers
,在搜索结果中选择 Datalayers, 点击下一步。注:服务器通讯地址填写 `Datalayers HTTP 地址`
创建
,保存资源配置。集成
> 规则
页面,在左侧 SQL 编辑器中填写相应的 SQL 规则,点击右侧 创建
。如下图:
写语句
的编辑器中,写入相应的语句模板。保存
。此时,经过以上几步实现了使用 EMQX 规则引擎将数据写入到 Datalayers 中。
通过以上配置后,我们通过向 EMQX 发布消息(topic以 t/
开头),相应的数据就会自动保存到 Datalayers 中。我们使用 EMQX Dashboard 中提供的问题分析工具 > WebSocket 客户端来演示说明。
t/#
进行通配订阅,方便后续观察。t/1
,Payload 框中输入 { "sid": "1", "temp":25.6 }
,点击发布,此时我们将看到 pub 与 sub 的消息,如下:
此时,说明已经成功将数据 pub 到了 MQTT Broker 中。
demo> select * from sensor_info;
+-------------------------------+-----+------+
| time | sid | temp |
+-------------------------------+-----+------+
| 2024-09-12T20:26:58.804+08:00 | 1 | 25.6 |
| 2024-09-12T20:29:16.010+08:00 | 1 | 25.7 |
+-------------------------------+-----+------+
2 rows in set (0.001 sec)
关于命令行工具,更详细的用法请参考命令行工具。
本文详细介绍了如何利用 EMQX 规则引擎将物联网数据存储到 Datalayers 中,为用户提供了一个强力的处理和存储海量 IoT 数据的方案。使得用户可以轻松地将设备产生的数据流通过 MQTT 协议管理并持久化存储。这种集成方法可显著简化数据管理流程,促进业务应用的快速部署和扩展,不仅提升了数据处理的效率,还确保了数据的实时性和可靠性。