自定义驱动开发指南
自定义驱动介绍
BIE提供了子设备管理能力,针对不同类型的子设备,需要通过驱动完成BIE与子设备间的适配工作。BIE已经提供了Modbus、OPC-UA、IPC、IEC-104的系统驱动。除此之外的子设备,需要通过BIE提供的SDK(https://github.com/baetyl/baetyl-go) 开发自定义驱动,以完成BIE与子设备间的适配工作。
自定义驱动复用原设备接入框架,就可以实现数据接入,不需要bie做任何修改,快速实现自定义设备接入。
BIE各模块与驱动交互逻辑如下图所示
上图中包含几个主要模块
- baetyl-cloud:BIE云端管理服务
- baetyl-core:BIE边缘核心服务
- baetyl-broker:mqtt broker
- custom driver:自定义驱动
- device1、device2:子设备 驱动位于BIE 边缘组件 baetyl-core 和子设备之间,与baetyl-core之间通过baetyl-broker进行解耦。驱动与baetyl-broker,baetyl-broker与baetyl-core之间都是通过mqtt协议通信。一方面,驱动与子设备对接,需要通过协议读取子设备的数据后需要发送至baetyl-broker,再通过baetyl-core发送至云端baetyl-cloud。另一方面,驱动接收来自baetyl-broker的数据,写入子设备。baetyl-broker发送至驱动的数据来自于云端baetyl-cloud。
Golang驱动开发SDK
配置文件
SDK会从配置文件中获取设备、驱动、接入模板的数据,配置文件都放在驱动pod里的 /etc/baetyl/ 目录下
sub_devices.yml
『节点管理』-『子设备管理』中的驱动配置和子设备配置信息存放在sub_device.yml中
- devcies[i].accessConfig.custom为子设备的配置
- driver为驱动配置
devices:
- name: device1
version: 1663240313luvrs4
deviceModel: device-model
accessTemplate: device-access-tpl
deviceTopic:
delta:
qos: 1
topic: thing/device-model/device1/property/invoke
report:
qos: 1
topic: thing/device-model/device1/property/post
event:
qos: 1
topic: thing/device-model/device1/raw/c2d
get:
qos: 1
topic: $baetyl/device/device1/get
getResponse:
qos: 1
topic: $baetyl/device/device1/getResponse
eventReport:
qos: 1
topic: thing/device-model/device1/event/post
propertyGet:
qos: 1
topic: thing/device-model/device1/property/get
lifecycleReport:
qos: 1
topic: thing/device-model/device1/lifecycle/post
accessConfig:
custom: |-
channelId: test-chan-01
machineNumber: N001L01.101
driver: |-
channels:
- name: test-chan-01
address: 192.168.0.1:23
interval: 30s
models.yml
『子设备管理』-『产品』中的产品测点信息存放在models.yml中
device-model:
- name: switch
type: bool
mode: rw
- name: temperature
type: float32
mode: ro
- name: humidity
type: float32
mode: ro
- name: high-temperature-threshold
type: int32
mode: rw
- name: high-temperature-alarm
type: bool
mode: ro
access_template.yml
『子设备管理』-『接入模板』中的设备点表和物模型点位映射信息存放在access_template.yml中
- device-access-tpl.properties[i].visitor.custom为设备点表信息中的采集配置
- device-access-tpl.properties[i].mapping为物模型点位映射信息
device-access-tpl:
properties:
- name: 高温报警
id: "1"
type: bool
visitor:
custom: |-
start: 1
offset: 14
mappings:
- attribute: high-temperature-alarm
type: value
expression: x1
主要数据结构
子设备信息结构体 DeviceInfo
type DeviceInfo struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Version string `yaml:"version,omitempty" json:"version,omitempty"`
// Deprecated: Use DeviceTopic instead.
// Change from access template support
Topic `yaml:",inline" json:",inline"`
DeviceModel string `yaml:"deviceModel,omitempty" json:"deviceModel,omitempty"`
AccessTemplate string `yaml:"accessTemplate,omitempty" json:"accessTemplate,omitempty"`
DeviceTopic DeviceTopic `yaml:"deviceTopic,omitempty" json:"deviceTopic,omitempty"`
AccessConfig *AccessConfig `yaml:"accessConfig,omitempty" json:"accessConfig,omitempty"`
}
type DeviceTopic struct {
Delta mqtt2.QOSTopic `yaml:"delta,omitempty" json:"delta,omitempty"`
Report mqtt2.QOSTopic `yaml:"report,omitempty" json:"report,omitempty"`
Event mqtt2.QOSTopic `yaml:"event,omitempty" json:"event,omitempty"`
Get mqtt2.QOSTopic `yaml:"get,omitempty" json:"get,omitempty"`
GetResponse mqtt2.QOSTopic `yaml:"getResponse,omitempty" json:"getResponse,omitempty"`
EventReport mqtt2.QOSTopic `yaml:"eventReport,omitempty" json:"eventReport,omitempty"`
PropertyGet mqtt2.QOSTopic `yaml:"propertyGet,omitempty" json:"propertyGet,omitempty"`
LifecycleReport mqtt2.QOSTopic `yaml:"lifecycleReport,omitempty" json:"lifecycleReport,omitempty"`
}
- Name:子设备名称
- Version:子设备当前版本
-
DeviceTopic:子设备系统主题
- Delta:thing/{产品名}/{设备名}/property/invoke,接收来自云端的属性变更通知,驱动通过该主题接收到消息时通常需要将对应的属性值写入子设备。
- Report:thing/{产品名}/{设备名}/property/post,用于驱动将获取的子设备数据发送至baetyl-core。
- Event:thing/{产品名}/{设备名}/raw/c2d,接收来自云端的事件通知。
- Get:$baetyl/device/{设备名}/get,边缘侧用于驱动向baetyl-core发送消息,获取当前所有子设备的数据。
- GetResponse:$baetyl/device/{设备名}/getResponse,用于接收来自baetyl-core发送的所有子设备的数据,该主题只有在驱动向baetyl-core发送消息后才会接到数据,即向Get主题发送消息后。
- EventReport:thing/{产品名}/{设备名}/event/post,驱动上报事件的主题。
- PropertyGet:thing/{产品名}/{设备名}/property/get,接收来自云端的属性获取请求,驱动接收到后立即进行一次数据采集,并通过Report主题上报。
- LifecycleReport:thing/{产品名}/{设备名}/lifecycle/post,驱动上报设备在离线状态的主题。
子设备连接配置结构体 AccessConfig
type AccessConfig struct {
Modbus *ModbusAccessConfig `yaml:"modbus,omitempty" json:"modbus,omitempty"`
Opcua *OpcuaAccessConfig `yaml:"opcua,omitempty" json:"opcua,omitempty"`
IEC104 *IEC104AccessConfig `yaml:"iec104,omitempty" json:"iec104,omitempty"`
Custom *CustomAccessConfig `yaml:"custom,omitempty" json:"custom,omitempty"`
}
type CustomAccessConfig string
- Modbus、Opcua、IEC104字段为系统驱动使用,开发自定义驱动时不需要使用。
- CustomAccessConfig对应每个子设备的连接配置信息且是字符串,具体格式可能为Yaml或JSON,驱动根据实际进行相应解析。即2.1.1节中的子设备配置和驱动配置。
子设备属性结构体 DeviceProperty
type DeviceProperty struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Id string `yaml:"id,omitempty" json:"id,omitempty"`
Type string `yaml:"type,omitempty" json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Mode string `yaml:"mode,omitempty" json:"mode,omitempty" validate:"regexp=^(ro|rw)?$"`
Unit string `yaml:"unit,omitempty" json:"unit,omitempty"`
Visitor PropertyVisitor `yaml:"visitor,omitempty" json:"visitor,omitempty"`
}
type PropertyVisitor struct {
Modbus *ModbusVisitor `yaml:"modbus,omitempty" json:"modbus,omitempty"`
Opcua *OpcuaVisitor `yaml:"opcua,omitempty" json:"opcua,omitempty"`
IEC104 *IEC104Visitor `yaml:"iec104,omitempty" json:"iec104,omitempty"`
Custom *CustomVisitor `yaml:"custom,omitempty" json:"custom,omitempty"`
}
type CustomVisitor string
- Name:属性的名字
- Type:属性的数据类型
- Mode:标识该属性是读写或只读
- Vistor:属性的扩展字段,驱动使用该信息向子设备读取写入数据
-
PropertyVisitor
- Modbus、Opcua、IEC104:是系统驱动使用,开发自定义驱动无需使用
- Custom:该字段也以字符串保存属性的扩展信息,驱动根据实际进行相应解析。即2.1.3中的设备点表信息中的采集配置。
子设备影子结构体 DeviceShadow
type DeviceShadow struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Report v1.Report `yaml:"report,omitempty" json:"report,omitempty"`
Desire v1.Desire `yaml:"desire,omitempty" json:"desire,omitempty"`
}
type Report map[string]interface{}
type Desire map[string]interface{}
- Name:子设备名字
- Report:驱动读取子设备的上报数据。
- Desire:云端设置的子设备数据,该字段仅针对子设备读写属性。
事件结构体 Event
type Event struct {
Type string yaml:"type,omitempty" json:"type,omitempty"
Payload interface{} yaml:"payload,omitempty" json:"payload,omitempty"
}
- Type:事件类型
- Payload:事件的具体数据
系统接口
// 获取驱动配置
GetDriverConfig() string
// 获取所有子设备的信息
GetAllDevices() []DeviceInfo
// 获取某个子设备的信息
GetDevice(device string) (*DeviceInfo, error)
// 获取所有子设备的测点/物模型信息
GetAllDeviceModels() map[string][]DeviceProperty
// 获取某个子设备的测点/物模型信息
GetDeviceModel(device *DeviceInfo) ([]DeviceProperty, error)
// 获取所有子设备的接入模板
GetAllAccessTemplates() map[string]AccessTemplate
// 获取某个子设备的接入模板
GetAccessTemplates(device *DeviceInfo) (*AccessTemplate, error)
// 获取子设备数据,通过该方法向Get主题发送请求消息,需要手动订阅GetResponse主题获取返回结果
GetDeviceProperties(device *DeviceInfo) (*DeviceShadow, error)
// 上报子设备在线
Online(device *DeviceInfo) error
// 上报子设备离线
Offline(device *DeviceInfo) error
// 上报子设备测点数据
ReportDeviceProperties(*DeviceInfo, v1.Report) error
// 上报子设备事件
ReportDeviceEvents(*DeviceInfo, v1.EventReport) error
// 注册云端置数回调函数
RegisterDeltaCallback(cb DeltaCallback) error
type DeltaCallback func(*DeviceInfo, v1.Delta) error
// 注册云端事件下发回调函数
RegisterEventCallback(cb EventCallback) error
type EventCallback func(*DeviceInfo, *Event) error
// 注册云端召测回调函数
RegisterPropertyGetCallback(cb PropertyGetCallback) error
type PropertyGetCallback func(*DeviceInfo, []string) error
开发示例
开发一个针对http设备模拟器的自定义驱动 BIE自定义驱动SDK:https://github.com/baetyl/baetyl-go
http设备模拟器
镜像地址
registry.baidubce.com/baetyl-test/device-simulator:v0.1.0
模拟行为
- 当计数器开关关闭时,每次获取随机值会得到随机的int32数值,计数器值为0
- 当计数器开关打开时,每次获取随机值会得到随机的int32数值,计数器值每隔1秒加1, 加到100000后重置为0,重新开始 设备点位
点位名称 | 点位标识符 | 数据类型 |
---|---|---|
计数器开关 | switch | bool |
计数器值 | counter | int32 |
随机值 | random | int32 |
接口
路径 | 方法 | request body | response body |
---|---|---|---|
/switch | GET | 无 | {"switch": true, "status": "success"} |
/switch | PUT | {"switch", false} | {"status": "success"} |
/counter | GET | 无 | {"counter": 32, "status": "success"} |
/random | GET | 无 | {"random": 12, "status": "success"} |
核心代码
主函数
main.go
package main
import dm "github.com/baetyl/baetyl-go/v2/dmcontext"
func main() {
dm.Run(func(ctx dm.Context) error {
d, err := newDriver(ctx)
if err != nil {
return err
}
d.start()
defer d.stop()
ctx.Wait()
return nil
})
}
驱动配置和子设备配置
device_config.go
package config
type DeviceConfig struct {
Channel string `yaml:"channel" json:"channel"`
MachineNum string `yaml:"machineNum" json:"machineNum"`
Properties []Property `yaml:"properties" json:"properties"`
}
type Property struct {
Name string `yaml:"name" json:"name"`
Path string `yaml:"path" json:"path"`
}
driver_config.go
package config
import "time"
type DriverConfig struct {
Channels []Channel `yaml:"channels" json:"channels"`
}
type Channel struct {
Name string `yaml:"name" json:"name"`
Address string `yaml:"address" json:"address"`
Interval time.Duration `yaml:"interval" json:"interval" default:"30s"`
}
配置解析
driver.go
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
dm "github.com/baetyl/baetyl-go/v2/dmcontext"
"github.com/baetyl/baetyl-go/v2/errors"
"github.com/baetyl/baetyl-go/v2/log"
"github.com/baetyl/baetyl-go/v2/spec/v1"
"gopkg.in/yaml.v2"
"custom-driver/config"
)
type driver struct {
ctx dm.Context
channels map[string]config.Channel
devCfgs map[string]config.DeviceConfig
log *log.Logger
}
func newDriver(ctx dm.Context) (*driver, error) {
// 解析驱动配置
var driverCfg config.DriverConfig
if err := yaml.Unmarshal([]byte(ctx.GetDriverConfig()), &driverCfg); err != nil {
return nil, err
}
channels := make(map[string]config.Channel)
for _, channel := range driverCfg.Channels {
channels[channel.Name] = channel
}
// 解析各个子设备配置
devCfgs := make(map[string]config.DeviceConfig)
for _, dev := range ctx.GetAllDevices() {
var devCfg config.DeviceConfig
if err := yaml.Unmarshal([]byte(*dev.AccessConfig.Custom), &devCfg); err != nil {
return nil, err
}
accessTpl, err := ctx.GetAccessTemplates(&dev)
if err != nil {
return nil, err
}
var properties []config.Property
for _, prop := range accessTpl.Properties {
cfg := config.Property{Name: prop.Name}
if err := yaml.Unmarshal([]byte(*prop.Visitor.Custom), &cfg); err != nil {
return nil, err
}
properties = append(properties, cfg)
}
devCfg.Properties = properties
devCfgs[dev.Name] = devCfg
// ensure device is ready, then tell cloud all the devices is online
if err := ctx.Online(&dev); err != nil {
return nil, err
}
}
d := &driver{
ctx: ctx,
channels: channels,
devCfgs: devCfgs,
log: ctx.Log().With(log.Any("module", "custom driver")),
}
// 注册数据通知、事件、召测回调函数
if err := ctx.RegisterDeltaCallback(d.DeltaCallback); err != nil {
return nil, err
}
if err := ctx.RegisterEventCallback(d.EventCallback); err != nil {
return nil, err
}
if err := ctx.RegisterPropertyGetCallback(d.PropertyGetCallback); err != nil {
return nil, err
}
return d, nil
}
定时读取设备数据上报
driver.go
func (d *driver) start() {
for _, dev := range d.ctx.GetAllDevices() {
go d.running(&dev)
}
}
func (d *driver) stop() {
}
// 针对多个子设备开始定时读数据
func (d *driver) running(dev *dm.DeviceInfo) {
cfg, ok := d.devCfgs[dev.Name]
if !ok {
d.log.Error("device config not exist", log.Any("device", cfg))
return
}
channel, ok := d.channels[cfg.Channel]
if !ok {
d.log.Error("channel not exist", log.Any("channel", cfg.Channel))
return
}
ticker := time.NewTicker(channel.Interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := d.execute(dev)
if err != nil {
d.log.Error("failed to execute", log.Error(err))
}
case <-d.ctx.WaitChan():
d.log.Warn("task of device stopped", log.Any("device", dev))
return
}
}
}
// 读取数据后通过上报函数发送数据至baetyl-broker
func (d *driver) execute(dev *dm.DeviceInfo) error {
cfg, _ := d.devCfgs[dev.Name]
r := v1.Report{}
for _, p := range cfg.Properties {
val, err := d.read(&cfg, &p)
if err != nil {
return err
}
r[p.Name] = val
}
if err := d.ctx.ReportDeviceProperties(dev, r); err != nil {
return err
}
return nil
}
// 读取子设备数据
func (d *driver) read(dev *config.DeviceConfig, prop *config.Property) (interface{}, error) {
resp, err := http.Get(d.channels[dev.Channel].Address + prop.Path)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
if val, ok := result[prop.Name]; ok {
return val, nil
} else {
return nil, fmt.Errorf("failed to get property: %s", prop.Name)
}
}
回调函数
driver.go
// 数据通知回调函数,标识云端写入数据至子设备
func (d *driver) DeltaCallback(dev *dm.DeviceInfo, prop v1.Delta) error {
devConfig, _ := d.devCfgs[dev.Name]
for _, cfg := range devConfig.Properties {
for k, v := range prop {
if k == cfg.Name {
pld, err := json.Marshal(map[string]interface{}{cfg.Name: v})
if err != nil {
return err
}
req, err := http.NewRequest("PUT", d.channels[devConfig.Channel].Address+cfg.Path, bytes.NewBuffer(pld))
if err != nil {
return err
}
_, err = http.DefaultClient.Do(req)
if err != nil {
return err
}
}
}
}
return nil
}
// 事件回调函数
func (d *driver) EventCallback(dev *dm.DeviceInfo, event *dm.Event) error {
switch event.Type {
// 即时上报事件
case dm.TypeReportEvent:
if err := d.execute(dev); err != nil {
return err
}
default:
return errors.New("event type not supported yet")
}
return nil
}
// 召测回调函数
func (d *driver) PropertyGetCallback(dev *dm.DeviceInfo, properties []string) error {
return d.execute(dev)
}
BIE云端配置
产品
『子设备管理』-『产品』
子设备
『子设备管理』-『子设备』
接入模板
『子设备管理』-『接入模板』
驱动配置
『边缘节点』-『子设备管理』-『设备驱动』-『配置』
子设备配置
『边缘节点』-『子设备管理』-『子设备』-『配置』