从 Modbus 到 Web 数据可视化之 WebSocket 实时消息

时间:2024-01-21 15:52:11

前言

  工业物联网是一个范围很大的概念,本文从数据可视化的角度介绍了一个最小化的工业物联网平台,从 Modbus 数据采集到前端数据可视化呈现的基本实现思路。这里面主要涉及基于 Modbus 通讯规约的数据采集、后台实时数据处理、前端实时数据接收、前端实时数据可视化显示。物联网平台架构主要参考了图扑物联工业物联网平台,并从中提取了部分功能进行介绍,前端数据可视化采用的是HT for Web

  由于内容比较多,具体实现上涉及到前端工程师、后台工程师、数据采集工程师等多个开发角色的参与,所以本文重点介绍实现思路和 WebSocket 消息推送的实现,其它环节的具体实现细节作者会在其它文章中进行详细介绍。

 

一、物联网平台架构

  物联网平台主要是B/S模式,工业物联网平台大都采用的是微服务架构,本文主要涉及两个微服务:前置数据采集服务和 Web 实时消息推送服务。

  前置数据采集服务主要用于现场设备、仪器、仪表、传感器实时数据的采集,IoTopo工业物联网平台支持MQTT和透传云解析两种方式,透传云解析支持 Modbus 通讯规约。

  实时数据采集到平台后,需要推送到浏览器端进行显示,Web 实时消息推送服务采用 Web Socket 进行实时数据推送,可以确保数据的实时性和高效性。

 

  前端可视化技术采用的是HT for Web, HT for Web 是基于HTML5标准的企业应用图形界面一站式解决方案,其包含通用组件、拓扑组件和3D渲染引擎等丰富的图形界面开发类库。虽然 HT for Web 是商业软件但其提供的一站式解决方案可以极大缩短产品开发周期、减少研发成本、补齐我们在 Web 图形界面可视化技术上的短板。

 

二、Modbus 数据采集

Modbus是一种串行通信协议,是Modicon公司(现在的施耐德电气Schneider Electric)于1979年为使用可编程逻辑控制器(PLC)通信而发表。Modbus已经成为工业领域通信协议的业界标准,并且现在是工业电子设备之间常用的连接方式。Modbus比其他通信协议使用的更广泛的主要原因有:

  1. 公开发表并且无版权要求
  2. 易于部署和维护
  3. 对供应商来说,修改移动本地的比特或字节没有很多限制

Modbus允许多个 (大约240个) 设备连接在同一个网络上进行通信,举个例子,一个由测量温度和湿度的装置,并且将结果发送给计算机。在数据采集与监视控制系统(SCADA)中,Modbus通常用来连接监控计算机和远程终端控制系统(RTU)。

目前主流的编辑语言都有 Modbus 开发库,由于 Modbus 相对比较简单,很多企业也选择自行开发实现。Modbus 数据采集属于后台通讯,数据采集到平台后首先会进行数据清理和预处理,过滤掉冗余和无效数据,形成实时数据。平台获取到实时数据后一般会做 3 项工作:

1. 推送到 Web 前端进行显示

2. 存储到时序数据库

3. 判断是否产生告警

 

 

三、将实时数据推送到 Web 前端

  基于 Web 的实时数据推送需要用到 WebSocket,初学者可以学习阮一峰老师的 WebSocket 教程。我们基于 WebSocket 封装了一套消息传输协议,类似于一个消息中间件,前端部分可以订阅实时数据。考虑到海量实时数据的推送需求,将实时数据分为平台级、站点级、设备级,前端在订阅实时数据时,可以通过消息主题规则订阅不同级别的数据。平台侧在收到订阅请求时,可以主动推送一次实时数据。这样可以确保数据可视化界面在订阅实时数据成功后,第一时间显示出正确的界面。

  下面给出一个简化的 WebSocket 消息协议的客户端代码,大家可以在些基础上进行改造以适合自己的业务场景。

  消息主题正则表达式,用来匹配消息主题:

1 const matchWildcard = function(str, rule) {
2     return new RegExp(\'^\' + rule.split(\'*\').join(\'.*\') + \'$\').test(str)
3 }

  WebSocket 客户端,支持消息主题订阅、取消消息主题订阅、同一个消息主题支持多个订阅者:

  1 class WebSocketClient {
  2     constructor() {
  3         this.ws = null
  4         this.opts = {
  5             debug: false,
  6             autoReconnect: true,
  7             reconnectInterval: 10000,
  8             subscriber: {},
  9         }
 10         this.opened = false
 11     }
 12 
 13     connect() {
 14         if (!this.opened) {
 15             return
 16         }
 17 
 18         const url = \'ws://www.iotopo.com/msg/v1\'
 19         console.debug(\'websocket connect\', url)
 20 
 21         let ws = this.ws = new WebSocket(url)
 22         ws.onmessage = event => {
 23             if (this.opts.debug) {
 24                 console.log(event)
 25             }
 26             let data = JSON.parse(event.data)
 27 
 28             for (let topic in this.opts.subscriber) {
 29                 if (matchWildcard(data.topic, topic)) {
 30                     let listeners = this.opts.subscriber[topic]
 31                     if (Array.isArray(listeners)) {
 32                         listeners.forEach(cb => {
 33                             if (typeof cb === \'function\') {
 34                                 cb(data.payload)
 35                             }
 36                         })
 37                     }
 38                 }
 39             }
 40         }
 41         ws.onopen = e => {
 42             if (this.opts.debug) {
 43                 console.log(e)
 44             }
 45             // 执行订阅请求
 46             for (let topic in this.opts.subscriber) {
 47                 this._sendSubscribe(topic)
 48             }
 49             if (typeof this.opts.onopen === \'function\') {
 50                 this.opts.onopen(e)
 51             }
 52         }
 53         ws.onclose = e => {
 54             if (this.opts.debug) {
 55                 console.log(e)
 56             }
 57             if (typeof this.opts.onclose === \'function\') {
 58                 this.opts.onclose(e)
 59             }
 60             if (this.opened && this.opts.autoReconnect) {
 61                 setTimeout(() => {
 62                     this.connect()
 63                 }, this.opts.reconnectInterval)
 64             }
 65         }
 66         ws.onerror = e => {
 67             if (this.opts.debug) {
 68                 console.log(e)
 69             }
 70             if (typeof this.opts.onerror === \'function\') {
 71                 this.opts.onerror(e)
 72             }
 73         }
 74     }
 75 
 76     open(opts) {
 77         if (!this.opened) {
 78             Object.assign(this.opts, opts || {})
 79             this.opened = true
 80             this.connect()
 81         }
 82     }
 83 
 84     close() {
 85         this.opened = false
 86         if (this.ws !== null) {
 87             this.ws.close()
 88         }
 89         this.ws = null
 90     }
 91 
 92     isOpened() {
 93         return this.opened
 94     }
 95 
 96     isConnected() {
 97         return this.ws !== null
 98     }
 99 
100     _sendSubscribe(topic) {
101         if (this.ws === null) {
102             return Error(\'websocet not opened\')
103         }
104         if (typeof topic !== \'string\') {
105             return Error(\'topic should be a string value\')
106         }
107 
108         if (this.ws.readyState === WebSocket.OPEN) {
109             let msg = {
110                 type: \'subscribe\',
111                 topic: topic,
112             }
113             this.ws.send(JSON.stringify(msg))
114         } else {
115             return Error(\'websocet not connected\')
116         }
117     }
118 
119     subscribe(topic, cb) {
120         if (this.opts.debug) {
121             console.log(\'subscribe:\', topic)
122         }
123         let listeners = this.opts.subscriber[topic]
124         if (!Array.isArray(listeners)) {
125             listeners = [
126                 cb
127             ]
128             this.opts.subscriber[topic] = listeners
129         } else {
130             listeners.push(cb)
131         }
132         this._sendSubscribe(topic)
133 
134         return { topic, cb }
135     }
136 
137     unsubscribe({topic, cb}) {
138         if (this.opts.debug) {
139             console.log(\'unsubscribe:\', topic)
140         }
141 
142         if (this.ws === null) {
143             return Error(\'websocet not opened\')
144         }
145 
146         if (typeof topic !== \'string\') {
147             return Error(\'topic should be a string value\')
148         }
149 
150         let listeners = this.opts.subscriber[topic]
151         if (cb) {
152             if (Array.isArray(listeners)) {
153                 let idx = listeners.indexOf(cb)
154                 if (idx >= 0) {
155                     listeners.splice(idx, 1)
156                 }
157             }
158         } else {
159             delete this.opts.subscriber[topic]
160         }
161 
162         if (Array.isArray(listeners) && listeners == 0) {
163             if (this.ws.readyState === WebSocket.OPEN) {
164                 let msg = {
165                     type: \'unsubscribe\',
166                     topic: topic,
167                 }
168                 this.ws.send(JSON.stringify(msg))
169             } else {
170                 return Error(\'websocet not connected\')
171             }
172         }
173     }
174 }

  用法举例:

 1 // 初始化客户端
 2 const ws = new WebSocketClient()
 3 // 与 WebSocket 服务器建议连接
 4 ws.open({
 5     debug: false
 6 })
 7 // 订阅消息
 8 ws.subscribe(\'/foo/bar/*\', function(msg) {
 9     console.log(\'recv ws msg:\', msg)
10 })

 

四、数据可视化界面实现

  基于 HT for Web 可以简单快速地搭建一个符合 HTML5 标准的可视化图形界面,通过 WebSocket 订阅实时数据,然后驱动图形界面的变化。数据驱动图形界面变化的实现方式很多,基本方法是采用数据绑定的方式,具体可以参考 HT for Web 的官方文档

在后面的文章中,作者会介绍一种基于 HT for Web 实现的业务数据和图形数据分离的数据绑定方法。

 

在线演示地址