247 lines
7.3 KiB
Go
247 lines
7.3 KiB
Go
package opcua
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/gopcua/opcua/errors"
|
|
"github.com/gopcua/opcua/id"
|
|
"github.com/gopcua/opcua/ua"
|
|
"github.com/gopcua/opcua/uasc"
|
|
)
|
|
|
|
const (
|
|
DefaultSubscriptionMaxNotificationsPerPublish = 10000
|
|
DefaultSubscriptionLifetimeCount = 10000
|
|
DefaultSubscriptionMaxKeepAliveCount = 3000
|
|
DefaultSubscriptionInterval = 100 * time.Millisecond
|
|
DefaultSubscriptionPriority = 0
|
|
)
|
|
|
|
type Subscription struct {
|
|
SubscriptionID uint32
|
|
RevisedPublishingInterval time.Duration
|
|
RevisedLifetimeCount uint32
|
|
RevisedMaxKeepAliveCount uint32
|
|
Notifs chan *PublishNotificationData
|
|
c *Client
|
|
}
|
|
|
|
type SubscriptionParameters struct {
|
|
Interval time.Duration
|
|
LifetimeCount uint32
|
|
MaxKeepAliveCount uint32
|
|
MaxNotificationsPerPublish uint32
|
|
Priority uint8
|
|
Notifs chan *PublishNotificationData
|
|
}
|
|
|
|
func NewMonitoredItemCreateRequestWithDefaults(nodeID *ua.NodeID, attributeID ua.AttributeID, clientHandle uint32) *ua.MonitoredItemCreateRequest {
|
|
if attributeID == 0 {
|
|
attributeID = ua.AttributeIDValue
|
|
}
|
|
return &ua.MonitoredItemCreateRequest{
|
|
ItemToMonitor: &ua.ReadValueID{
|
|
NodeID: nodeID,
|
|
AttributeID: attributeID,
|
|
DataEncoding: &ua.QualifiedName{},
|
|
},
|
|
MonitoringMode: ua.MonitoringModeReporting,
|
|
RequestedParameters: &ua.MonitoringParameters{
|
|
ClientHandle: clientHandle,
|
|
DiscardOldest: true,
|
|
Filter: nil,
|
|
QueueSize: 10,
|
|
SamplingInterval: 0.0,
|
|
},
|
|
}
|
|
}
|
|
|
|
type PublishNotificationData struct {
|
|
SubscriptionID uint32
|
|
Error error
|
|
Value interface{}
|
|
}
|
|
|
|
// Cancel() deletes the Subscription from Server and makes the Client forget it so that publishing
|
|
// loops cannot deliver notifications to it anymore
|
|
func (s *Subscription) Cancel() error {
|
|
s.c.forgetSubscription(s.SubscriptionID)
|
|
|
|
req := &ua.DeleteSubscriptionsRequest{
|
|
SubscriptionIDs: []uint32{s.SubscriptionID},
|
|
}
|
|
var res *ua.DeleteSubscriptionsResponse
|
|
err := s.c.Send(req, func(v interface{}) error {
|
|
return safeAssign(v, &res)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if res.ResponseHeader.ServiceResult != ua.StatusOK {
|
|
return res.ResponseHeader.ServiceResult
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
|
|
// Part 4, 5.12.2.2 CreateMonitoredItems Service Parameters
|
|
req := &ua.CreateMonitoredItemsRequest{
|
|
SubscriptionID: s.SubscriptionID,
|
|
TimestampsToReturn: ts,
|
|
ItemsToCreate: items,
|
|
}
|
|
|
|
var res *ua.CreateMonitoredItemsResponse
|
|
err := s.c.Send(req, func(v interface{}) error {
|
|
return safeAssign(v, &res)
|
|
})
|
|
return res, err
|
|
}
|
|
|
|
func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
|
|
req := &ua.DeleteMonitoredItemsRequest{
|
|
MonitoredItemIDs: monitoredItemIDs,
|
|
SubscriptionID: s.SubscriptionID,
|
|
}
|
|
var res *ua.DeleteMonitoredItemsResponse
|
|
err := s.c.Send(req, func(v interface{}) error {
|
|
return safeAssign(v, &res)
|
|
})
|
|
return res, err
|
|
}
|
|
|
|
func (s *Subscription) publish(acks []*ua.SubscriptionAcknowledgement) (*ua.PublishResponse, error) {
|
|
if acks == nil {
|
|
acks = []*ua.SubscriptionAcknowledgement{}
|
|
}
|
|
req := &ua.PublishRequest{
|
|
SubscriptionAcknowledgements: acks,
|
|
}
|
|
|
|
var res *ua.PublishResponse
|
|
err := s.c.sendWithTimeout(req, s.publishTimeout(), func(v interface{}) error {
|
|
return safeAssign(v, &res)
|
|
})
|
|
return res, err
|
|
}
|
|
|
|
func (s *Subscription) publishTimeout() time.Duration {
|
|
timeout := time.Duration(s.RevisedMaxKeepAliveCount) * s.RevisedPublishingInterval // expected keepalive interval
|
|
if timeout > uasc.MaxTimeout {
|
|
return uasc.MaxTimeout
|
|
}
|
|
if timeout < s.c.cfg.RequestTimeout {
|
|
return s.c.cfg.RequestTimeout
|
|
}
|
|
return timeout
|
|
}
|
|
|
|
// Run() starts an infinite loop that sends PublishRequests and delivers received
|
|
// notifications to registered Subscriptions.
|
|
// It is the responsibility of the user to stop no longer needed Run() loops by cancelling ctx
|
|
// Note that Run() may return before ctx is cancelled in case of an irrecoverable communication error
|
|
func (s *Subscription) Run(ctx context.Context) {
|
|
var acks []*ua.SubscriptionAcknowledgement
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// send the next publish request
|
|
// note that res contains data even if an error was returned
|
|
res, err := s.publish(acks)
|
|
switch {
|
|
case err == ua.StatusBadSequenceNumberUnknown:
|
|
// At least one ack has been submitted repeatedly
|
|
// Ignore the error. Acks will be cleared below
|
|
case err == ua.StatusBadTimeout:
|
|
// ignore and continue the loop
|
|
case err == ua.StatusBadNoSubscription:
|
|
// All subscriptions have been deleted, but the publishing loop is still running
|
|
// The user will stop the loop or create subscriptions at his discretion
|
|
case err != nil:
|
|
// irrecoverable error
|
|
s.c.notifySubscriptionsOfError(ctx, res, err)
|
|
return
|
|
}
|
|
|
|
if res != nil {
|
|
// Prepare SubscriptionAcknowledgement for next PublishRequest
|
|
acks = make([]*ua.SubscriptionAcknowledgement, 0)
|
|
if res.AvailableSequenceNumbers != nil {
|
|
for _, i := range res.AvailableSequenceNumbers {
|
|
ack := &ua.SubscriptionAcknowledgement{
|
|
SubscriptionID: res.SubscriptionID,
|
|
SequenceNumber: i,
|
|
}
|
|
acks = append(acks, ack)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err == nil {
|
|
s.c.notifySubscription(ctx, res)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Subscription) sendNotification(ctx context.Context, data *PublishNotificationData) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case s.Notifs <- data:
|
|
}
|
|
|
|
}
|
|
|
|
// Stats returns a diagnostic struct with metadata about the current subscription
|
|
func (s *Subscription) Stats() (*ua.SubscriptionDiagnosticsDataType, error) {
|
|
// TODO(kung-foo): once browsing feature is merged, attempt to get direct access to the
|
|
// diagnostics node. for example, Prosys lists them like:
|
|
// i=2290/ns=1;g=918ee6f4-2d25-4506-980d-e659441c166d
|
|
// maybe cache the nodeid to speed up future stats queries
|
|
node := s.c.Node(ua.NewNumericNodeID(0, id.Server_ServerDiagnostics_SubscriptionDiagnosticsArray))
|
|
v, err := node.Value()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, eo := range v.Value().([]*ua.ExtensionObject) {
|
|
stat := eo.Value.(*ua.SubscriptionDiagnosticsDataType)
|
|
|
|
if stat.SubscriptionID == s.SubscriptionID {
|
|
return stat, nil
|
|
}
|
|
}
|
|
|
|
return nil, errors.Errorf("unable to find SubscriptionDiagnostics for sub=%d", s.SubscriptionID)
|
|
}
|
|
|
|
func (p *SubscriptionParameters) setDefaults() {
|
|
if p.MaxNotificationsPerPublish == 0 {
|
|
p.MaxNotificationsPerPublish = DefaultSubscriptionMaxNotificationsPerPublish
|
|
}
|
|
if p.LifetimeCount == 0 {
|
|
p.LifetimeCount = DefaultSubscriptionLifetimeCount
|
|
}
|
|
if p.MaxKeepAliveCount == 0 {
|
|
p.MaxKeepAliveCount = DefaultSubscriptionMaxKeepAliveCount
|
|
}
|
|
if p.Interval == 0 {
|
|
p.Interval = DefaultSubscriptionInterval
|
|
}
|
|
if p.Priority == 0 {
|
|
// DefaultSubscriptionPriority is 0 at the time of writing, so this redundant assignment is
|
|
// made only to allow for a one-liner change of default priority should a need arise
|
|
// and to explicitly expose the default priority as a constant
|
|
p.Priority = DefaultSubscriptionPriority
|
|
}
|
|
if p.Notifs == nil {
|
|
p.Notifs = make(chan *PublishNotificationData)
|
|
}
|
|
}
|