Mainflux.mainflux/vendor/github.com/gopcua/opcua/subscription.go

247 lines
7.3 KiB
Go

package opcua
import (
"context"
"time"
"github.com/gopcua/opcua/errors"
"github.com/gopcua/opcua/id"
"github.com/gopcua/opcua/ua"
"github.com/gopcua/opcua/uasc"
)
const (
DefaultSubscriptionMaxNotificationsPerPublish = 10000
DefaultSubscriptionLifetimeCount = 10000
DefaultSubscriptionMaxKeepAliveCount = 3000
DefaultSubscriptionInterval = 100 * time.Millisecond
DefaultSubscriptionPriority = 0
)
type Subscription struct {
SubscriptionID uint32
RevisedPublishingInterval time.Duration
RevisedLifetimeCount uint32
RevisedMaxKeepAliveCount uint32
Notifs chan *PublishNotificationData
c *Client
}
type SubscriptionParameters struct {
Interval time.Duration
LifetimeCount uint32
MaxKeepAliveCount uint32
MaxNotificationsPerPublish uint32
Priority uint8
Notifs chan *PublishNotificationData
}
func NewMonitoredItemCreateRequestWithDefaults(nodeID *ua.NodeID, attributeID ua.AttributeID, clientHandle uint32) *ua.MonitoredItemCreateRequest {
if attributeID == 0 {
attributeID = ua.AttributeIDValue
}
return &ua.MonitoredItemCreateRequest{
ItemToMonitor: &ua.ReadValueID{
NodeID: nodeID,
AttributeID: attributeID,
DataEncoding: &ua.QualifiedName{},
},
MonitoringMode: ua.MonitoringModeReporting,
RequestedParameters: &ua.MonitoringParameters{
ClientHandle: clientHandle,
DiscardOldest: true,
Filter: nil,
QueueSize: 10,
SamplingInterval: 0.0,
},
}
}
type PublishNotificationData struct {
SubscriptionID uint32
Error error
Value interface{}
}
// Cancel() deletes the Subscription from Server and makes the Client forget it so that publishing
// loops cannot deliver notifications to it anymore
func (s *Subscription) Cancel() error {
s.c.forgetSubscription(s.SubscriptionID)
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
err := s.c.Send(req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
return err
}
if res.ResponseHeader.ServiceResult != ua.StatusOK {
return res.ResponseHeader.ServiceResult
}
return nil
}
func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
// Part 4, 5.12.2.2 CreateMonitoredItems Service Parameters
req := &ua.CreateMonitoredItemsRequest{
SubscriptionID: s.SubscriptionID,
TimestampsToReturn: ts,
ItemsToCreate: items,
}
var res *ua.CreateMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
}
func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
req := &ua.DeleteMonitoredItemsRequest{
MonitoredItemIDs: monitoredItemIDs,
SubscriptionID: s.SubscriptionID,
}
var res *ua.DeleteMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
}
func (s *Subscription) publish(acks []*ua.SubscriptionAcknowledgement) (*ua.PublishResponse, error) {
if acks == nil {
acks = []*ua.SubscriptionAcknowledgement{}
}
req := &ua.PublishRequest{
SubscriptionAcknowledgements: acks,
}
var res *ua.PublishResponse
err := s.c.sendWithTimeout(req, s.publishTimeout(), func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
}
func (s *Subscription) publishTimeout() time.Duration {
timeout := time.Duration(s.RevisedMaxKeepAliveCount) * s.RevisedPublishingInterval // expected keepalive interval
if timeout > uasc.MaxTimeout {
return uasc.MaxTimeout
}
if timeout < s.c.cfg.RequestTimeout {
return s.c.cfg.RequestTimeout
}
return timeout
}
// Run() starts an infinite loop that sends PublishRequests and delivers received
// notifications to registered Subscriptions.
// It is the responsibility of the user to stop no longer needed Run() loops by cancelling ctx
// Note that Run() may return before ctx is cancelled in case of an irrecoverable communication error
func (s *Subscription) Run(ctx context.Context) {
var acks []*ua.SubscriptionAcknowledgement
for {
select {
case <-ctx.Done():
return
default:
// send the next publish request
// note that res contains data even if an error was returned
res, err := s.publish(acks)
switch {
case err == ua.StatusBadSequenceNumberUnknown:
// At least one ack has been submitted repeatedly
// Ignore the error. Acks will be cleared below
case err == ua.StatusBadTimeout:
// ignore and continue the loop
case err == ua.StatusBadNoSubscription:
// All subscriptions have been deleted, but the publishing loop is still running
// The user will stop the loop or create subscriptions at his discretion
case err != nil:
// irrecoverable error
s.c.notifySubscriptionsOfError(ctx, res, err)
return
}
if res != nil {
// Prepare SubscriptionAcknowledgement for next PublishRequest
acks = make([]*ua.SubscriptionAcknowledgement, 0)
if res.AvailableSequenceNumbers != nil {
for _, i := range res.AvailableSequenceNumbers {
ack := &ua.SubscriptionAcknowledgement{
SubscriptionID: res.SubscriptionID,
SequenceNumber: i,
}
acks = append(acks, ack)
}
}
}
if err == nil {
s.c.notifySubscription(ctx, res)
}
}
}
}
func (s *Subscription) sendNotification(ctx context.Context, data *PublishNotificationData) {
select {
case <-ctx.Done():
return
case s.Notifs <- data:
}
}
// Stats returns a diagnostic struct with metadata about the current subscription
func (s *Subscription) Stats() (*ua.SubscriptionDiagnosticsDataType, error) {
// TODO(kung-foo): once browsing feature is merged, attempt to get direct access to the
// diagnostics node. for example, Prosys lists them like:
// i=2290/ns=1;g=918ee6f4-2d25-4506-980d-e659441c166d
// maybe cache the nodeid to speed up future stats queries
node := s.c.Node(ua.NewNumericNodeID(0, id.Server_ServerDiagnostics_SubscriptionDiagnosticsArray))
v, err := node.Value()
if err != nil {
return nil, err
}
for _, eo := range v.Value().([]*ua.ExtensionObject) {
stat := eo.Value.(*ua.SubscriptionDiagnosticsDataType)
if stat.SubscriptionID == s.SubscriptionID {
return stat, nil
}
}
return nil, errors.Errorf("unable to find SubscriptionDiagnostics for sub=%d", s.SubscriptionID)
}
func (p *SubscriptionParameters) setDefaults() {
if p.MaxNotificationsPerPublish == 0 {
p.MaxNotificationsPerPublish = DefaultSubscriptionMaxNotificationsPerPublish
}
if p.LifetimeCount == 0 {
p.LifetimeCount = DefaultSubscriptionLifetimeCount
}
if p.MaxKeepAliveCount == 0 {
p.MaxKeepAliveCount = DefaultSubscriptionMaxKeepAliveCount
}
if p.Interval == 0 {
p.Interval = DefaultSubscriptionInterval
}
if p.Priority == 0 {
// DefaultSubscriptionPriority is 0 at the time of writing, so this redundant assignment is
// made only to allow for a one-liner change of default priority should a need arise
// and to explicitly expose the default priority as a constant
p.Priority = DefaultSubscriptionPriority
}
if p.Notifs == nil {
p.Notifs = make(chan *PublishNotificationData)
}
}