
EMQX 是一款高度可扩展、高性能的开源 MQTT 消息代理(Message Broker),用于处理海量物联网设备的消息通信。它是基于 Erlang/OTP 语言开发的,具有卓越的并发处理能力和稳定性。EMQX 主要应用于物联网(IoT)领域,帮助设备通过 MQTT 协议进行消息发布和订阅,广泛用于智慧城市、智能制造、车联网、能源管理等领域。
本文将介绍如何通过 EMQX 规则引擎 将数据写入 Datalayers。
向 Datalayers 写入数据前,需要先创建 database 与 table。可使用 Datalayers 命令行工具 dlsql 完成创建。
> create database demo
Query OK, 0 rows affected. (0.001 sec)
> use demo
Database changed to `demo`
> CREATE TABLE `sensor_info` (
time TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
temp DOUBLE,
timestamp key(time))
PARTITION BY HASH (sid) PARTITIONS 1
ENGINE=TimeSeries
Query OK, 0 rows affected. (0.034 sec)
集成 > 连接器。Datalayers,在搜索结果中选择 Datalayers, 点击下一步。
注:服务器通讯地址填写 `Datalayers HTTP 地址`
创建,保存资源配置。集成 > 规则 页面,在左侧 SQL 编辑器中填写相应的 SQL 规则,点击右侧 创建。如下图:

在写语句的编辑器中,写入相应的语句模板。保存。完成以上步骤后,即可通过 EMQX 规则引擎将消息数据写入 Datalayers。
通过以上配置后,我们通过向 EMQX 发布消息(topic以 t/ 开头),相应的数据就会自动保存到 Datalayers 中。我们使用 EMQX Dashboard 中提供的问题分析工具 > WebSocket 客户端来演示说明。
t/# 进行通配订阅,方便后续观察。t/1,Payload 框中输入 { "sid": "1", "temp":25.6 },点击发布,此时我们将看到 pub 与 sub 的消息,如下:

此时,说明已经成功将数据 pub 到了 MQTT Broker 中。
demo> select * from sensor_info;
+-------------------------------+-----+------+
| time | sid | temp |
+-------------------------------+-----+------+
| 2024-09-12T20:26:58.804+08:00 | 1 | 25.6 |
| 2024-09-12T20:29:16.010+08:00 | 1 | 25.7 |
+-------------------------------+-----+------+
2 rows in set (0.001 sec)
关于命令行工具,更详细的用法请参考命令行工具。
本文介绍了通过 EMQX 规则引擎将物联网数据写入 Datalayers 的实现方法。该集成方式能够简化数据接入链路,支持设备消息的实时落库与后续分析,适用于 IoT 数据平台建设。
高性能、云原生的时序数据存储引擎,轻松应对海量数据的写入与查询