diff --git a/go.sum b/go.sum index 93ed602..b67b4bf 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I= github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= +github.com/go-resty/resty/v2 v2.4.0 h1:s6TItTLejEI+2mn98oijC5w/Rk2YU+OA6x0mnZN6r6k= +github.com/go-resty/resty/v2 v2.4.0/go.mod h1:B88+xCTEwvfD94NOuE6GS1wMlnoKNY8eEiNizfNwOwA= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= @@ -14,8 +16,14 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/iot_device.go b/iot_device.go index b4e1bb2..9c38934 100644 --- a/iot_device.go +++ b/iot_device.go @@ -70,6 +70,7 @@ type iotDevice struct { deviceUpgradeHandler DeviceUpgradeHandler fileUrls map[string]string qos byte + batchSubDeviceSize int } func (device *iotDevice) DisConnect() () { @@ -202,22 +203,43 @@ func (device *iotDevice) DeleteSubDevices(deviceIds []string) bool { func (device *iotDevice) UpdateSubDeviceState(subDevicesStatus SubDevicesStatus) bool { glog.Infof("begin to update sub-devices status") - requestEventService := DataEntry{ - ServiceId: "$sub_device_manager", - EventType: "sub_device_update_status", - EventTime: GetEventTimeStamp(), - Paras: subDevicesStatus, + subDeviceCounts := len(subDevicesStatus.DeviceStatuses) + + batchUpdateSubDeviceState := 0 + if subDeviceCounts%device.batchSubDeviceSize == 0 { + batchUpdateSubDeviceState = subDeviceCounts / device.batchSubDeviceSize + } else { + batchUpdateSubDeviceState = subDeviceCounts/device.batchSubDeviceSize + 1 } - request := Data{ - ObjectDeviceId: device.Id, - Services: []DataEntry{requestEventService}, - } + for i := 0; i < batchUpdateSubDeviceState; i++ { + begin := i * device.batchSubDeviceSize + end := (i + 1) * device.batchSubDeviceSize + if end > subDeviceCounts { + end = subDeviceCounts + } - if token := device.client.Publish(FormatTopic(DeviceToPlatformTopic, device.Id), device.qos, false, Interface2JsonString(request)); - token.Wait() && token.Error() != nil { - glog.Warningf("gateway %s update sub devices status failed", device.Id) - return false + sds := SubDevicesStatus{ + DeviceStatuses: subDevicesStatus.DeviceStatuses[begin:end], + } + + requestEventService := DataEntry{ + ServiceId: "$sub_device_manager", + EventType: "sub_device_update_status", + EventTime: GetEventTimeStamp(), + Paras: sds, + } + + request := Data{ + ObjectDeviceId: device.Id, + Services: []DataEntry{requestEventService}, + } + + if token := device.client.Publish(FormatTopic(DeviceToPlatformTopic, device.Id), device.qos, false, Interface2JsonString(request)); + token.Wait() && token.Error() != nil { + glog.Warningf("gateway %s update sub devices status failed", device.Id) + return false + } } glog.Info("gateway update sub devices status failed", device.Id) @@ -603,9 +625,31 @@ func (device *iotDevice) ReportProperties(properties DeviceProperties) bool { } func (device *iotDevice) BatchReportSubDevicesProperties(service DevicesService) { - if token := device.client.Publish(FormatTopic(GatewayBatchReportSubDeviceTopic, device.Id), device.qos, false, Interface2JsonString(service)); - token.Wait() && token.Error() != nil { - glog.Warningf("device %s batch report sub device properties failed", device.Id) + + subDeviceCounts := len(service.Devices) + + batchReportSubDeviceProperties := 0 + if subDeviceCounts%device.batchSubDeviceSize == 0 { + batchReportSubDeviceProperties = subDeviceCounts / device.batchSubDeviceSize + } else { + batchReportSubDeviceProperties = subDeviceCounts/device.batchSubDeviceSize + 1 + } + + for i := 0; i < batchReportSubDeviceProperties; i++ { + begin := i * device.batchSubDeviceSize + end := (i + 1) * device.batchSubDeviceSize + if end > subDeviceCounts { + end = subDeviceCounts + } + + sds := DevicesService{ + Devices: service.Devices[begin:end], + } + + if token := device.client.Publish(FormatTopic(GatewayBatchReportSubDeviceTopic, device.Id), device.qos, false, Interface2JsonString(sds)); + token.Wait() && token.Error() != nil { + glog.Warningf("device %s batch report sub device properties failed", device.Id) + } } } @@ -645,20 +689,39 @@ func (device *iotDevice) SetPropertyQueryHandler(handler DevicePropertyQueryHand } func CreateIotDevice(id, password, servers string) Device { - return CreateIotDeviceWithQos(id, password, servers, 0) + config := DeviceConfig{ + Id: id, + Password: password, + Servers: servers, + Qos: 0, + } + + return CreateIotDeviceWitConfig(config) } func CreateIotDeviceWithQos(id, password, servers string, qos byte) Device { + config := DeviceConfig{ + Id: id, + Password: password, + Servers: servers, + Qos: qos, + } + + return CreateIotDeviceWitConfig(config) +} + +func CreateIotDeviceWitConfig(config DeviceConfig) Device { device := &iotDevice{} - device.Id = id - device.Password = password - device.Servers = servers + device.Id = config.Id + device.Password = config.Password + device.Servers = config.Servers device.messageHandlers = []MessageHandler{} device.commandHandlers = []CommandHandler{} device.fileUrls = map[string]string{} - device.qos = qos + device.qos = config.Qos + device.batchSubDeviceSize = config.BatchSubDeviceSize return device } diff --git a/iot_device_config.go b/iot_device_config.go new file mode 100644 index 0000000..956723c --- /dev/null +++ b/iot_device_config.go @@ -0,0 +1,9 @@ +package iot + +type DeviceConfig struct { + Id string + Password string + Servers string + Qos byte + BatchSubDeviceSize int +} diff --git a/samples/gateway/gateway_update_status.go b/samples/gateway/gateway_update_status.go index ec05132..ccc48e8 100644 --- a/samples/gateway/gateway_update_status.go +++ b/samples/gateway/gateway_update_status.go @@ -3,11 +3,12 @@ package main import ( "fmt" iot "github.com/ctlove0523/huaweicloud-iot-device-sdk-go" + "strconv" "time" ) func main() { - device := iot.CreateIotDevice("5fdb75cccbfe2f02ce81d4bf_go-mqtt", "123456789", "tls://iot-mqtts.cn-north-4.myhuaweicloud.com:8883") + device := iot.CreateIotDevice("5fdb75cccbfe2f02ce81d4bf_go-sdk", "123456789", "tls://iot-mqtts.cn-north-4.myhuaweicloud.com:8883") device.SetSubDevicesAddHandler(func(devices iot.SubDeviceInfo) { for _, info := range devices.Devices { fmt.Println("handle device add") @@ -23,31 +24,22 @@ func main() { }) device.Init() - TestDeleteSubDevices(device) - time.Sleep(2* time.Second) + TestUpdateSubDeviceState(device) + time.Sleep(200 * time.Second) - //device.SyncAllVersionSubDevices() - - - time.Sleep(time.Hour) } func TestUpdateSubDeviceState(device iot.Device) { - subDevice1 := iot.DeviceStatus{ - DeviceId: "5fdb75cccbfe2f02ce81d4bf_sub-device-1", - Status: "OFFLINE", - } - subDevice2 := iot.DeviceStatus{ - DeviceId: "5fdb75cccbfe2f02ce81d4bf_sub-device-2", - Status: "OFFLINE", - } - subDevice3 := iot.DeviceStatus{ - DeviceId: "5fdb75cccbfe2f02ce81d4bf_sub-device-3", - Status: "ONLINE", - } + var devicesStatus []iot.DeviceStatus + for i := 0; i < 200; i++ { + subDevice := iot.DeviceStatus{ + DeviceId: "5fdb75cccbfe2f02ce81d4bf_sub-device-" + strconv.Itoa(i), + Status: "ONLINE", + } - devicesStatus := []iot.DeviceStatus{subDevice1, subDevice2, subDevice3} + devicesStatus = append(devicesStatus, subDevice) + } ok := device.UpdateSubDeviceState(iot.SubDevicesStatus{ DeviceStatuses: devicesStatus,