1. 概述
DeviceController属于KubeEdge的云端组件,负责设备管理。KubeEdge利用Kubernetes提供的CRD机制,对真实的物理设备进行抽象,通过自定义一个名为Device
的自定义资源(Custom Resource)来描述设备的元数据以及状态。而DeviceController,顾名思义,就是这一资源的的控制器,由它负责云边的设备信息同步。在具体的实现中,DeviceController分为两个部分,会启动两个独立的goroutine,即downstream
和upstream
,其中downstream
通过监听 Kubernetes API Server 将设备的状态更新由云端同步至边缘端;而upstream
则负责订阅来自边缘端的消息,并将其同步至API Server中。
再具体分析这部分源码之前,需要明确两个概念,即DeviceModel
和DeviceInstance
。在KubeEdge中,DeviceController通过这两个概念对设备进行抽象。
- DeviceModel:描述了设备的属性(properties),定义了访问这些属性的方式(property visitor)。我们可以将
DeviceModel
理解为一组设备的模板。DeviceModel
的具体设计详见这里。
- DeviceInstance:表示一个真实的设备实例。通过引用
DeviceModel
,创建一个设备实例。其中,Device Spec
表示的设备的期望状态,而Device Status
则表示设备的真实状态。DeviceInstance
的具体设计详见这里。
2. 源码分析
2.1. 代码入口
为了避免篇幅冗长,代码有部分省略。
DownstreamController
的定义如下,它的作用是watch kubernetes api server and send change to edge。
1
2
3
4
5
6
7
8
9
10
11
|
// DownstreamController watch kubernetes api server and send change to edge
type DownstreamController struct {
kubeClient *kubernetes.Clientset
messageLayer messagelayer.MessageLayer
deviceManager *manager.DeviceManager
deviceModelManager *manager.DeviceModelManager
configMapManager *manager.ConfigMapManager
crdClient *rest.RESTClient
}
|
UpstreamController
的定义如下,它的所用是subscribe messages from edge and sync to k8s api server。
1
2
3
4
5
6
7
8
9
10
|
// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct {
crdClient *rest.RESTClient
messageLayer messagelayer.MessageLayer
// message channel
deviceStatusChan chan model.Message
// downstream controller to update device status in cache
dc *DownstreamController
}
|
模块启动入口:cloud/pkg/devicecontroller/devicecontroller.go,DeviceController 主体逻辑如下,
1
2
3
4
5
6
7
8
9
10
11
|
func (dc *DeviceController) Start() {
downstream, err := controller.NewDownstreamController()
...
upstream, err := controller.NewUpstreamController(downstream)
...
downstream.Start()
// wait for downstream controller to start and load deviceModels and devices
// TODO think about sync
time.Sleep(1 * time.Second)
upstream.Start()
}
|
2.2. downstream
downstream.Start()
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("Start downstream devicecontroller")
go dc.syncDeviceModel()
// Wait for adding all device model
// TODO need to think about sync
time.Sleep(1 * time.Second)
go dc.syncDevice()
return nil
}
|
可以看到,downstreamcontroller
的启动函数非常简单,即由两个独立的goroutine分别运行syncDeviceModel()
和syncDevice()
。由于DeviceModel
必须要在Device
实例之前先创建好,因此syncDeviceModel()
和syncDevice()
的启动顺序不能变。不过目前的实现中,只是简单的通过sleep 1秒钟来解决两者的同步问题,这是比较粗糙的做法,这里也标注了 TODO,后续肯定是需要改进的。
syncDeviceModel()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// syncDeviceModel is used to get events from informer
func (dc *DownstreamController) syncDeviceModel() {
for {
select {
case <-beehiveContext.Done():
klog.Info("stop syncDeviceModel")
return
case e := <-dc.deviceModelManager.Events():
deviceModel, ok := e.Object.(*v1alpha2.DeviceModel)
...
switch e.Type {
case watch.Added:
dc.deviceModelAdded(deviceModel)
case watch.Deleted:
dc.deviceModelDeleted(deviceModel)
case watch.Modified:
dc.deviceModelUpdated(deviceModel)
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
}
}
}
|
这部分的代码还是非常清晰的,从deviceModelManager
获取deviceModel
相关的事件,根据事件类型触发deviceModel
的添加、删除、更新等操作:
- deviceModelAdded:该方法只会在本地缓存中添加一条记录,并不会真正的创建
deviceModel
实例;
- deviceModelUpdated:该方法会更新缓存,并执行
updateAllConfigMaps()
,只不过后者还没有实现;
- deviceModelDeleted:该方法目前只会在本地缓存中删掉这个记录,TODO中说明应该要删除这个
deviceModel
关联的所有设备,只不过目前还没有实现。
syncDevice()
syncDevice()
部分的逻辑与syncDeviceModel()
一致,从deviceManager
获取device
相关的事件,并根据事件类型触发device
的添加、删除、更新等操作:
- deviceAdded:该方法相比于
deviceModelAdded()
要复杂很多。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// deviceAdded creates a device, adds in deviceManagers map, send a message to edge node if node selector is present.
func (dc *DownstreamController) deviceAdded(device *v1alpha2.Device) {
dc.deviceManager.Device.Store(device.Name, device) //缓存
if len(device.Spec.NodeSelector.NodeSelectorTerms) != 0 &&
len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions) != 0 &&
len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values) != 0 {
dc.addToConfigMap(device) //根据device API创建configMap,相当于把配置写进去
edgeDevice := createDevice(device) //根据device API创建types.Device
msg := model.NewMessage("")
resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "")
...
msg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)
content := types.MembershipUpdate{AddDevices: []types.Device{ //消息的内容
edgeDevice,
}}
content.EventID = uuid.NewV4().String()
content.Timestamp = time.Now().UnixNano() / 1e6
msg.Content = content
err = dc.messageLayer.Send(*msg) //发送消息
...
}
}
|
首先,和deviceModelAdded()
的逻辑一样,会在deviceManager
中缓存一份。
然后,判断与该设备绑定的边缘节点是否为空,如果非空,就进入下一步。
随后调用addToConfigMap()
创建一个configMap,configMap的作用非常重要,在边缘端mapper中,所有与设备相关的配置(比如说protocol、protocolVisitor等等)都是通过configMap获取的。因此,当在云端添加一个device时,就需要更新或创建对应的configMap。不过addToConfigMap()
方法不会直接将configMap更新至边缘节点,只是在云端更新了,然后由edgecontroller watch到configMap的更新后,再同步到边缘节点。
接着调用createDevice(device)
,这一步的作用是,由标准的Device API转成Device,注意这两者的区别!前者是基于Kubernetes CRD机制定义的自定义资源(Custom Resource),是标准的Kubernetes API对象;后者用于云边通信(即cloudhub与edgehub之间的消息传输),也就是说,边缘端拿到的Device对象并不是标准的Device API。设备这一块的处理和其他内置资源(比如Pod、Service)不同,对于内置资源,都是把完整的API对象发往边缘端的;而设备的处理则定义很多其他的结构体(主要就是Device、MsgTwin、MsgAttr)用于云边的device数据传输。
最后就是创建消息msg,并将其发送至边缘端。注意,msg.Content 中存放的是 types.MembershipUpdate{},里面是一个新增设备的列表。和edgecontroller不同的是,edgecontroller同步至边缘的消息中,content是Pod、ConfigMap等对象。
- deviceDeleted:删除设备与新增设备的逻辑基本一致,不再赘述。
- deviceUpdated:设备更新的逻辑也比较复杂,该方法代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
// deviceUpdated updates the map, check if device is actually updated.
// If nodeSelector is updated, call add device for newNode, deleteDevice for old Node.
// If twin is updated, send twin update message to edge
func (dc *DownstreamController) deviceUpdated(device *v1alpha2.Device) {
value, ok := dc.deviceManager.Device.Load(device.Name)//从缓存中取出old device
dc.deviceManager.Device.Store(device.Name, device)
if ok {
cachedDevice := value.(*v1alpha2.Device)
if isDeviceUpdated(cachedDevice, device) {
// if node selector updated delete from old node and create in new node
if isNodeSelectorUpdated(cachedDevice.Spec.NodeSelector, device.Spec.NodeSelector) {
dc.deviceAdded(device)
deletedDevice := &v1alpha2.Device{ObjectMeta: cachedDevice.ObjectMeta,
Spec: cachedDevice.Spec,
Status: cachedDevice.Status,
TypeMeta: device.TypeMeta,
}
dc.deviceDeleted(deletedDevice)
} else {
// update config map if spec, data or twins changed
if isProtocolConfigUpdated(&cachedDevice.Spec.Protocol, &device.Spec.Protocol) ||
isDeviceStatusUpdated(&cachedDevice.Status, &device.Status) ||
isDeviceDataUpdated(&cachedDevice.Spec.Data, &device.Spec.Data) {
dc.updateConfigMap(device)
}
// update twin properties
if isDeviceStatusUpdated(&cachedDevice.Status, &device.Status) {
// TODO: add an else if condition to check if DeviceModelReference has changed, if yes whether deviceModelReference exists
twin := make(map[string]*types.MsgTwin)
addUpdatedTwins(device.Status.Twins, twin, device.ResourceVersion)
addDeletedTwins(cachedDevice.Status.Twins, device.Status.Twins, twin, device.ResourceVersion)
msg := model.NewMessage("")
resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "device/"+device.Name+"/twin/cloud_updated", "")
if err != nil {
klog.Warningf("Built message resource failed with error: %s", err)
return
}
msg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)
content := types.DeviceTwinUpdate{Twin: twin}
content.EventID = uuid.NewV4().String()
content.Timestamp = time.Now().UnixNano() / 1e6
msg.Content = content
err = dc.messageLayer.Send(*msg)
...
}
}
}
} else {
// If device not present in device map means it is not modified and added.
dc.deviceAdded(device)
}
}
|
首先从缓存中取出old device,然后通过isDeviceUpdated(cachedDevice, device)
比较new device与old device是否发生了更新;
如果是与设备绑定的NodeSelector发生了变化,处理方式是删除old device并添加new device;
如果是设备的spec, data or twins发生了变化,则更新configMap。前面已经提过了,边缘的Mapper就是靠configMap来获取设备的所有信息的;
另外,如果是设备孪生(twin)发生了变化,要需要向边缘端同步一条消息,而消息的内容是 DeviceTwinUpdate{Twin: twin}。
2.3. upstream
upstream.Start()
1
2
3
4
5
6
7
8
9
10
11
12
|
// Start UpstreamController
func (uc *UpstreamController) Start() error {
klog.Info("Start upstream devicecontroller")
uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
go uc.dispatchMessage()
for i := 0; i < int(config.Config.Buffer.UpdateDeviceStatus); i++ {
go uc.updateDeviceStatus()
}
return nil
}
|
upstream的启动函数也非常简单,只做两件事情,分发消息并更新Device API(即Device.Status
部分)。
dispatchMessage()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (uc *UpstreamController) dispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Info("Stop dispatchMessage")
return
default:
}
msg, err := uc.messageLayer.Receive()
...
resourceType, err := messagelayer.GetResourceType(msg.GetResource())
...
switch resourceType {
case constants.ResourceTypeTwinEdgeUpdated:
uc.deviceStatusChan <- msg
default:
...
}
}
}
|
首先接收消息,然后根据消息的resourceType
进行转发,也就是把边缘端devicetwin
更新上来的消息转发至deviceStatusChan
中。
updateDeviceStatus()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
func (uc *UpstreamController) updateDeviceStatus() {
for {
select {
case <-beehiveContext.Done():
klog.Info("Stop updateDeviceStatus")
return
case msg := <-uc.deviceStatusChan:
...
msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
...
deviceID, err := messagelayer.GetDeviceID(msg.GetResource())
...
device, ok := uc.dc.deviceManager.Device.Load(deviceID)
...
cacheDevice, ok := device.(*v1alpha2.Device)
...
deviceStatus := &DeviceStatus{Status: cacheDevice.Status}
for twinName, twin := range msgTwin.Twin {
for i, cacheTwin := range deviceStatus.Status.Twins {
if twinName == cacheTwin.PropertyName && twin.Actual != nil && twin.Actual.Value != nil {
reported := v1alpha2.TwinProperty{}
reported.Value = *twin.Actual.Value
reported.Metadata = make(map[string]string)
if twin.Actual.Metadata != nil {
reported.Metadata["timestamp"] = strconv.FormatInt(twin.Actual.Metadata.Timestamp, 10)
}
if twin.Metadata != nil {
reported.Metadata["type"] = twin.Metadata.Type
}
deviceStatus.Status.Twins[i].Reported = reported
break
}
}
}
// Store the status in cache so that when update is received by informer, it is not processed by downstream controller
cacheDevice.Status = deviceStatus.Status
uc.dc.deviceManager.Device.Store(deviceID, cacheDevice)
body, err := json.Marshal(deviceStatus)
...
result := uc.crdClient.Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(deviceID).Body(body).Do(context.Background())
...
//send confirm message to edge twin
resMsg := model.NewMessage(msg.GetID())
nodeID, err := messagelayer.GetNodeID(msg)
...
resource, err := messagelayer.BuildResource(nodeID, "twin", "")
...
resMsg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.ResponseOperation)
resMsg.Content = "OK"
err = uc.messageLayer.Response(*resMsg)
...
}
}
}
|
updateDeviceStatus()
方法负责更新Device API,即Device.Status
部分;然后再给边缘端发一个确认消息。
2.4. 总结
DeviceController部分的源码整理如下: