该案例基于百度云和百度天工的物联网技术,介绍一个通过一个流式计算的任务,监控众多在线设备的方法,仅供参考。
百度云流式计算的介绍可以参考:https://cloud.baidu.com/forum/topic/show?topicId=262273
背景
某烟草客户拥有1000多个烤烟房,每个烤烟房通过一个物解析网关,每分钟采集一次烤烟房的温度、湿度等数据,报送到云端,以实时监控烤房内部温湿度是否正常。
通过为每一个烤烟房,建立一条告警,设定越限条件。当该烤烟房的温湿度越限,则触发短信告警。
但是,上述方案有一个隐含先决条件:网关总是能正常地上报数据。
我们知道,导致网关没法上报数据的因素有很多,比如供电故障、网络信号差、环境温度过高等等,都可能导致网关意外停止工作。而上述监控方案是不足以发现这种情况的。
这时候,流式计算就派上用场了。客户非常巧妙地利用到了流式计算的窗口和时间概念,设计的流式监控任务有如下特性:
· 一个任务,监控上述1000多个设备;
· 能够实时并且准确的报告具体哪一个网关,什么时候停止了数据的上报。
· 增加新网关设备时,无需修改任务。
实现步骤
整体架构如下下图所示:
具体技术步骤:
1,在物解析的轮询请求设置中,设置网关对Modbus设备的采集周期为60秒
2,每个解析项目解析后的数据转到统一的物接入主题:/modbus/parsed
解析后的数据格式(关键部分)如下:
{
"gatewayid":"3faedb39-77ff-45b6-ac7e-c1739077e4d5"
}
因为只对网关id进行监控,其他数据无关,因此省略。
在流式计算中:
3,创建一个数据源,名为: device_offline_monitor_src;数据源的物接入主题来自上面解析后的主题:/modbus/parsed
4,创建MQTT数据目的地,名为: device_offline_notice_sink,以接收异常设备告警。物接入主题为:/modbus/device/offline_notice
5,创建流式计算任务
时间类型选:PROCESSTIME
SQL 语句为:
INSERT INTOdevice_offline_notice_sink
SELECT
Session_End(rowtime,INTERVAL '65' SECOND ) AS ts_end,
gatewayid
FROM device_offline_monitor_src
GROUP BY
Session(rowtime, INTERVAL'65' SECOND ),
gatewayid
当异常发生时,从数据目的地主题/modbus/device/offline_notice能收到如下格式的消息:
{
"ts_end":1526459666537,
"gatewayid":"3faedb39-77ff-45b6-ac7e-c1739077e4d5"
}
其中:ts_end为设备停止上报数据的时间,单位为毫秒。即设备上一次上报的时间为ts_end – 65000。gatewayid为网关id。
说明:
1,这里演示的是输出MQTT报警消息,事实上客户通过规则引擎,将该MQTT消息转化成短信告警。
2,流式计算任务里面使用的时间类型为PROCESSTIME,这样即便监控的是一个设备,也能照样工作。
3,流式计算任务里面采用的会话窗口,当一直有数据流进来,窗口始终不关闭,也就是始终没有输出。一旦某个设备规定时间没有数据流进来,对应的会话窗口就会关闭,就会输出一条消息。以此触发告警。
4,会话窗口选择了65秒。理想情况下,只要60秒没有收到消息,即认为网关故障。但考虑到网络传输上可能导致的延迟,因此,这里再增加了5秒等待,以免因细小的传输延迟导致误报。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:利用百度云流式计算进行大规模设备监控 - Python技术站