该案例基于百度云和百度天工的物联网技术,介绍一个通过一个流式计算的任务,监控众多在线设备的方法,仅供参考。

百度云流式计算的介绍可以参考:https://cloud.baidu.com/forum/topic/show?topicId=262273

背景

某烟草客户拥有1000多个烤烟房,每个烤烟房通过一个物解析网关,每分钟采集一次烤烟房的温度、湿度等数据,报送到云端,以实时监控烤房内部温湿度是否正常。

通过为每一个烤烟房,建立一条告警,设定越限条件。当该烤烟房的温湿度越限,则触发短信告警。

但是,上述方案有一个隐含先决条件:网关总是能正常地上报数据。

我们知道,导致网关没法上报数据的因素有很多,比如供电故障、网络信号差、环境温度过高等等,都可能导致网关意外停止工作。而上述监控方案是不足以发现这种情况的。

利用百度云流式计算进行大规模设备监控

 

这时候,流式计算就派上用场了。客户非常巧妙地利用到了流式计算的窗口和时间概念,设计的流式监控任务有如下特性:

·        一个任务,监控上述1000多个设备;

·        能够实时并且准确的报告具体哪一个网关,什么时候停止了数据的上报。

·        增加新网关设备时,无需修改任务。

 

实现步骤

整体架构如下下图所示:

利用百度云流式计算进行大规模设备监控

 

具体技术步骤:

1,在物解析的轮询请求设置中,设置网关对Modbus设备的采集周期为60秒

利用百度云流式计算进行大规模设备监控

 

2,每个解析项目解析后的数据转到统一的物接入主题:/modbus/parsed

利用百度云流式计算进行大规模设备监控

解析后的数据格式(关键部分)如下:

{

        "gatewayid":"3faedb39-77ff-45b6-ac7e-c1739077e4d5"

}

   因为只对网关id进行监控,其他数据无关,因此省略。

 

在流式计算中:

3,创建一个数据源,名为: device_offline_monitor_src;数据源的物接入主题来自上面解析后的主题:/modbus/parsed

利用百度云流式计算进行大规模设备监控

 

4,创建MQTT数据目的地,名为: device_offline_notice_sink,以接收异常设备告警。物接入主题为:/modbus/device/offline_notice

利用百度云流式计算进行大规模设备监控

 

5,创建流式计算任务

     时间类型选:PROCESSTIME

     SQL 语句为:

 INSERT INTOdevice_offline_notice_sink

 SELECT

       Session_End(rowtime,INTERVAL '65' SECOND  ) AS ts_end,

       gatewayid

FROM device_offline_monitor_src

GROUP BY

       Session(rowtime, INTERVAL'65' SECOND ),

       gatewayid

利用百度云流式计算进行大规模设备监控

 

当异常发生时,从数据目的地主题/modbus/device/offline_notice能收到如下格式的消息:

{

            "ts_end":1526459666537,

            "gatewayid":"3faedb39-77ff-45b6-ac7e-c1739077e4d5"

}

其中:ts_end为设备停止上报数据的时间,单位为毫秒。即设备上一次上报的时间为ts_end – 65000。gatewayid为网关id。

      

说明:

1,这里演示的是输出MQTT报警消息,事实上客户通过规则引擎,将该MQTT消息转化成短信告警。

2,流式计算任务里面使用的时间类型为PROCESSTIME,这样即便监控的是一个设备,也能照样工作。

3,流式计算任务里面采用的会话窗口,当一直有数据流进来,窗口始终不关闭,也就是始终没有输出。一旦某个设备规定时间没有数据流进来,对应的会话窗口就会关闭,就会输出一条消息。以此触发告警。

4,会话窗口选择了65秒。理想情况下,只要60秒没有收到消息,即认为网关故障。但考虑到网络传输上可能导致的延迟,因此,这里再增加了5秒等待,以免因细小的传输延迟导致误报。