NOISSUE - Add opcua-adapter conn route-map, use ServerURI and NodeID (#975)

* NOISSUE - Add opcua-adapter conn route-map, use ServerURI and NodeID

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* NOISSUE - Add dynamic subscription

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
Manuel Imperiale 2019-12-09 23:16:18 +01:00 committed by Drasko DRASKOVIC
parent 5120a71595
commit 76b68e10a8
12 changed files with 308 additions and 282 deletions

View File

@ -62,13 +62,16 @@ const (
envRouteMapDB = "MF_OPCUA_ADAPTER_ROUTE_MAP_DB"
envNodesConfig = "MF_OPCUA_ADAPTER_CONFIG_FILE"
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"
columns = 2
)
type config struct {
httpPort string
opcConfig opcua.Config
opcuaConfig opcua.Config
natsURL string
logLevel string
esURL string
@ -95,15 +98,19 @@ func main() {
rmConn := connectToRedis(cfg.routeMapURL, cfg.routeMapPass, cfg.routeMapDB, logger)
defer rmConn.Close()
thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)
connRM := newRouteMapRepositoy(rmConn, connectionRMPrefix, logger)
esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer esConn.Close()
publisher := pub.NewMessagePublisher(natsConn)
thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)
ctx := context.Background()
pubsub := gopcua.NewPubSub(ctx, publisher, thingRM, chanRM, connRM, logger)
svc := opcua.New(publisher, thingRM, chanRM)
svc := opcua.New(pubsub, thingRM, chanRM, connRM, cfg.opcuaConfig, logger)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
@ -121,7 +128,7 @@ func main() {
}, []string{"method"}),
)
go subscribeToOpcuaServers(svc, cfg.nodesConfig, cfg.opcConfig, logger)
//go subscribeToNodesFromFile(svc, cfg.nodesConfig, cfg.opcuaConfig, logger)
go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger)
errs := make(chan error, 2)
@ -147,7 +154,7 @@ func loadConfig() config {
}
return config{
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
opcConfig: oc,
opcuaConfig: oc,
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
esURL: mainflux.Env(envESURL, defESURL),
@ -186,15 +193,7 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *
})
}
func readFromOpcuaServer(svc opcua.Service, cfg opcua.Config, logger logger.Logger) {
ctx := context.Background()
gr := gopcua.NewReader(ctx, svc, logger)
if err := gr.Read(cfg); err != nil {
logger.Warn(fmt.Sprintf("OPC-UA Read failed: %s", err))
}
}
func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) {
func subscribeToNodesFromFile(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) {
if _, err := os.Stat(nodes); os.IsNotExist(err) {
logger.Warn(fmt.Sprintf("Config file not found: %s", err))
return
@ -208,10 +207,6 @@ func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config,
defer file.Close()
reader := csv.NewReader(file)
ctx := context.Background()
gc := gopcua.NewClient(ctx, svc, logger)
for {
l, err := reader.Read()
if err == io.EOF {
@ -222,17 +217,17 @@ func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config,
return
}
if len(l) < 4 {
if len(l) < columns {
logger.Warn(fmt.Sprintf("Empty or incomplete line found in file"))
return
}
cfg.ServerURI = l[0]
cfg.NodeNamespace = l[1]
cfg.NodeIdentifierType = l[2]
cfg.NodeIdentifier = l[3]
cfg.NodeID = l[1]
go subscribeToOpcuaServer(gc, cfg, logger)
if err := svc.Subscribe(cfg); err != nil {
logger.Warn(fmt.Sprintf("OPC-UA Subscription failed: %s", err))
}
}
}

View File

@ -1,4 +1 @@
opc.tcp://opcua.rocks:4840,0,i,2255
opc.tcp://opcua.rocks:4840,0,i,2256
opc.tcp://opcua.rocks:4840,1,i,2255
opc.tcp://opcua.rocks:4840,1,i,2256
opc.tcp://opcua.rocks:4840,ns=0;i=2256

1 opc.tcp://opcua.rocks:4840 0 ns=0;i=2256 i 2255
opc.tcp://opcua.rocks:4840 0 i 2256
opc.tcp://opcua.rocks:4840 1 i 2255
opc.tcp://opcua.rocks:4840 1 i 2256

View File

@ -4,7 +4,6 @@
package api
import (
"context"
"fmt"
"time"
@ -27,9 +26,9 @@ func LoggingMiddleware(svc opcua.Service, logger logger.Logger) opcua.Service {
}
}
func (lm loggingMiddleware) CreateThing(mfxThing string, opcID string) (err error) {
func (lm loggingMiddleware) CreateThing(mfxThing, opcuaNodeID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("create_thing mfx:opcua:%s:%s took %s to complete", mfxThing, opcID, time.Since(begin))
message := fmt.Sprintf("create_thing %s with NodeID %s, took %s to complete", mfxThing, opcuaNodeID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -37,12 +36,12 @@ func (lm loggingMiddleware) CreateThing(mfxThing string, opcID string) (err erro
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.CreateThing(mfxThing, opcID)
return lm.svc.CreateThing(mfxThing, opcuaNodeID)
}
func (lm loggingMiddleware) UpdateThing(mfxThing string, opcID string) (err error) {
func (lm loggingMiddleware) UpdateThing(mfxThing, opcuaNodeID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("update_thing mfx:opcua:%s:%s took %s to complete", mfxThing, opcID, time.Since(begin))
message := fmt.Sprintf("update_thing %s with NodeID %s, took %s to complete", mfxThing, opcuaNodeID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -50,12 +49,12 @@ func (lm loggingMiddleware) UpdateThing(mfxThing string, opcID string) (err erro
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.UpdateThing(mfxThing, opcID)
return lm.svc.UpdateThing(mfxThing, opcuaNodeID)
}
func (lm loggingMiddleware) RemoveThing(mfxThing string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("remove_thing mfx:opcua:%s took %s to complete", mfxThing, time.Since(begin))
message := fmt.Sprintf("remove_thing %s, took %s to complete", mfxThing, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -66,9 +65,9 @@ func (lm loggingMiddleware) RemoveThing(mfxThing string) (err error) {
return lm.svc.RemoveThing(mfxThing)
}
func (lm loggingMiddleware) CreateChannel(mfxChan string, opcNamespace string) (err error) {
func (lm loggingMiddleware) CreateChannel(mfxChan, opcuaServerURI string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("create_channel mfx:opcua:%s:%s took %s to complete", mfxChan, opcNamespace, time.Since(begin))
message := fmt.Sprintf("create_channel %s with ServerURI %s, took %s to complete", mfxChan, opcuaServerURI, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -76,12 +75,12 @@ func (lm loggingMiddleware) CreateChannel(mfxChan string, opcNamespace string) (
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.CreateChannel(mfxChan, opcNamespace)
return lm.svc.CreateChannel(mfxChan, opcuaServerURI)
}
func (lm loggingMiddleware) UpdateChannel(mfxChanID string, opcNamespace string) (err error) {
func (lm loggingMiddleware) UpdateChannel(mfxChanID, opcuaServerURI string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("update_channel mfx:opcua:%s:%s took %s to complete", mfxChanID, opcNamespace, time.Since(begin))
message := fmt.Sprintf("update_channel %s with ServerURI %s, took %s to complete", mfxChanID, opcuaServerURI, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -89,12 +88,12 @@ func (lm loggingMiddleware) UpdateChannel(mfxChanID string, opcNamespace string)
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.UpdateChannel(mfxChanID, opcNamespace)
return lm.svc.UpdateChannel(mfxChanID, opcuaServerURI)
}
func (lm loggingMiddleware) RemoveChannel(mfxChanID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("remove_channel mfx_channel_%s took %s to complete", mfxChanID, time.Since(begin))
message := fmt.Sprintf("remove_channel %s, took %s to complete", mfxChanID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -105,9 +104,9 @@ func (lm loggingMiddleware) RemoveChannel(mfxChanID string) (err error) {
return lm.svc.RemoveChannel(mfxChanID)
}
func (lm loggingMiddleware) Publish(ctx context.Context, token string, m opcua.Message) (err error) {
func (lm loggingMiddleware) ConnectThing(mfxChanID, mfxThingID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("publish namespace/%s/id/%s/rx took %s to complete", m.Namespace, m.ID, time.Since(begin))
message := fmt.Sprintf("connect_thing for channel %s and thing %s, took %s to complete", mfxChanID, mfxThingID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -115,5 +114,31 @@ func (lm loggingMiddleware) Publish(ctx context.Context, token string, m opcua.M
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.Publish(ctx, token, m)
return lm.svc.ConnectThing(mfxChanID, mfxThingID)
}
func (lm loggingMiddleware) DisconnectThing(mfxChanID, mfxThingID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("disconnect_thing mfx-%s : mfx-%s, took %s to complete", mfxChanID, mfxThingID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.DisconnectThing(mfxChanID, mfxThingID)
}
func (lm loggingMiddleware) Subscribe(cfg opcua.Config) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("subscribe to server %s and node_id %s, took %s to complete", cfg.ServerURI, cfg.NodeID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.Subscribe(cfg)
}

View File

@ -4,7 +4,6 @@
package api
import (
"context"
"time"
"github.com/go-kit/kit/metrics"
@ -28,22 +27,22 @@ func MetricsMiddleware(svc opcua.Service, counter metrics.Counter, latency metri
}
}
func (mm *metricsMiddleware) CreateThing(mfxDevID string, opcID string) error {
func (mm *metricsMiddleware) CreateThing(mfxDevID, opcuaNodeID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "create_thing").Add(1)
mm.latency.With("method", "create_thing").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.CreateThing(mfxDevID, opcID)
return mm.svc.CreateThing(mfxDevID, opcuaNodeID)
}
func (mm *metricsMiddleware) UpdateThing(mfxDevID string, opcID string) error {
func (mm *metricsMiddleware) UpdateThing(mfxDevID, opcuaNodeID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "update_thing").Add(1)
mm.latency.With("method", "update_thing").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.UpdateThing(mfxDevID, opcID)
return mm.svc.UpdateThing(mfxDevID, opcuaNodeID)
}
func (mm *metricsMiddleware) RemoveThing(mfxDevID string) error {
@ -55,22 +54,22 @@ func (mm *metricsMiddleware) RemoveThing(mfxDevID string) error {
return mm.svc.RemoveThing(mfxDevID)
}
func (mm *metricsMiddleware) CreateChannel(mfxChanID string, opcNamespace string) error {
func (mm *metricsMiddleware) CreateChannel(mfxChanID, opcuaServerURI string) error {
defer func(begin time.Time) {
mm.counter.With("method", "create_channel").Add(1)
mm.latency.With("method", "create_channel").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.CreateChannel(mfxChanID, opcNamespace)
return mm.svc.CreateChannel(mfxChanID, opcuaServerURI)
}
func (mm *metricsMiddleware) UpdateChannel(mfxChanID string, opcNamespace string) error {
func (mm *metricsMiddleware) UpdateChannel(mfxChanID, opcuaServerURI string) error {
defer func(begin time.Time) {
mm.counter.With("method", "update_channel").Add(1)
mm.latency.With("method", "update_channel").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.UpdateChannel(mfxChanID, opcNamespace)
return mm.svc.UpdateChannel(mfxChanID, opcuaServerURI)
}
func (mm *metricsMiddleware) RemoveChannel(mfxChanID string) error {
@ -82,11 +81,29 @@ func (mm *metricsMiddleware) RemoveChannel(mfxChanID string) error {
return mm.svc.RemoveChannel(mfxChanID)
}
func (mm *metricsMiddleware) Publish(ctx context.Context, token string, m opcua.Message) error {
func (mm *metricsMiddleware) ConnectThing(mfxChanID, mfxThingID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "publish").Add(1)
mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
mm.counter.With("method", "connect_thing").Add(1)
mm.latency.With("method", "connect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.Publish(ctx, token, m)
return mm.svc.ConnectThing(mfxChanID, mfxThingID)
}
func (mm *metricsMiddleware) DisconnectThing(mfxChanID, mfxThingID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "disconnect_thing").Add(1)
mm.latency.With("method", "disconnect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.DisconnectThing(mfxChanID, mfxThingID)
}
func (mm *metricsMiddleware) Subscribe(cfg opcua.Config) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.Subscribe(cfg)
}

View File

@ -10,6 +10,7 @@ import (
opcuaGopcua "github.com/gopcua/opcua"
uaGopcua "github.com/gopcua/opcua/ua"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/opcua"
@ -29,22 +30,28 @@ var (
var _ opcua.Subscriber = (*client)(nil)
type client struct {
ctx context.Context
svc opcua.Service
logger logger.Logger
ctx context.Context
publisher mainflux.MessagePublisher
thingsRM opcua.RouteMapRepository
channelsRM opcua.RouteMapRepository
connectRM opcua.RouteMapRepository
logger logger.Logger
}
// NewClient returns new OPC-UA client instance.
func NewClient(ctx context.Context, svc opcua.Service, log logger.Logger) opcua.Subscriber {
// NewPubSub returns new OPC-UA client instance.
func NewPubSub(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber {
return client{
ctx: ctx,
svc: svc,
logger: log,
ctx: ctx,
publisher: pub,
thingsRM: thingsRM,
channelsRM: channelsRM,
connectRM: connectRM,
logger: log,
}
}
// Subscribe subscribes to the OPC-UA Server.
func (b client) Subscribe(cfg opcua.Config) error {
func (c client) Subscribe(cfg opcua.Config) error {
opts := []opcuaGopcua.Option{
opcuaGopcua.SecurityMode(uaGopcua.MessageSecurityModeNone),
}
@ -70,13 +77,13 @@ func (b client) Subscribe(cfg opcua.Config) error {
}
}
c := opcuaGopcua.NewClient(cfg.ServerURI, opts...)
if err := c.Connect(b.ctx); err != nil {
oc := opcuaGopcua.NewClient(cfg.ServerURI, opts...)
if err := oc.Connect(c.ctx); err != nil {
return errors.Wrap(errFailedConn, err)
}
defer c.Close()
defer oc.Close()
sub, err := c.Subscribe(&opcuaGopcua.SubscriptionParameters{
sub, err := oc.Subscribe(&opcuaGopcua.SubscriptionParameters{
Interval: 2000 * time.Millisecond,
})
if err != nil {
@ -84,19 +91,15 @@ func (b client) Subscribe(cfg opcua.Config) error {
}
defer sub.Cancel()
b.logger.Info(fmt.Sprintf("OPC-UA server URI: %s", cfg.ServerURI))
b.logger.Info(fmt.Sprintf("Created subscription with id %v", sub.SubscriptionID))
if err := b.runHandler(sub, cfg); err != nil {
if err := c.runHandler(sub, cfg); err != nil {
return err
}
return nil
}
func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) error {
nid := fmt.Sprintf("ns=%s;%s=%s", cfg.NodeNamespace, cfg.NodeIdentifierType, cfg.NodeIdentifier)
nodeID, err := uaGopcua.ParseNodeID(nid)
func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) error {
nodeID, err := uaGopcua.ParseNodeID(cfg.NodeID)
if err != nil {
return errors.Wrap(errFailedParseNodeID, err)
}
@ -112,15 +115,15 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
return errResponseStatus
}
go sub.Run(b.ctx)
go sub.Run(c.ctx)
for {
select {
case <-b.ctx.Done():
case <-c.ctx.Done():
return nil
case res := <-sub.Notifs:
if res.Error != nil {
b.logger.Error(res.Error.Error())
c.logger.Error(res.Error.Error())
continue
}
@ -128,8 +131,9 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
case *uaGopcua.DataChangeNotification:
for _, item := range x.MonitoredItems {
msg := opcua.Message{
Namespace: cfg.NodeNamespace,
ID: cfg.NodeIdentifier,
ServerURI: cfg.ServerURI,
NodeID: cfg.NodeID,
Type: item.Value.Value.Type().String(),
}
switch item.Value.Value.Type() {
@ -139,7 +143,7 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
msg.Data = item.Value.Value.Int()
case uaGopcua.TypeIDUint64:
msg.Data = item.Value.Value.Uint()
case uaGopcua.TypeIDFloat:
case uaGopcua.TypeIDFloat, uaGopcua.TypeIDDouble:
msg.Data = item.Value.Value.Float()
case uaGopcua.TypeIDString:
msg.Data = item.Value.Value.String()
@ -147,13 +151,51 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
msg.Data = 0
}
// Publish on Mainflux NATS broker
b.svc.Publish(b.ctx, "", msg)
c.Publish(c.ctx, "", msg)
}
default:
b.logger.Info(fmt.Sprintf("what's this publish result? %T", res.Value))
c.logger.Info(fmt.Sprintf("unknown publish result: %T", res.Value))
}
}
}
}
// Publish forwards messages from OPC-UA MQTT broker to Mainflux NATS broker
func (c client) Publish(ctx context.Context, token string, m opcua.Message) error {
// Get route-map of the OPC-UA ServerURI
chanID, err := c.channelsRM.Get(m.ServerURI)
if err != nil {
return opcua.ErrNotFoundServerURI
}
// Get route-map of the OPC-UA NodeID
thingID, err := c.thingsRM.Get(m.NodeID)
if err != nil {
return opcua.ErrNotFoundNodeID
}
// Check connection between ServerURI and NodeID
cKey := fmt.Sprintf("%s:%s", chanID, thingID)
if _, err := c.connectRM.Get(cKey); err != nil {
return opcua.ErrNotFoundConn
}
// Publish on Mainflux NATS broker
SenML := fmt.Sprintf(`[{"n":"%s","v":%v}]`, m.Type, m.Data)
payload := []byte(SenML)
msg := mainflux.Message{
Publisher: thingID,
Protocol: "opcua",
ContentType: "Content-Type",
Channel: chanID,
Payload: payload,
}
if err := c.publisher.Publish(ctx, token, msg); err != nil {
return err
}
c.logger.Info(fmt.Sprintf("publish from server %s and node_id %s with value %v", m.ServerURI, m.NodeID, m.Data))
return nil
}

View File

@ -1,73 +0,0 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package gopcua
import (
"context"
"fmt"
opcuaGopcua "github.com/gopcua/opcua"
uaGopcua "github.com/gopcua/opcua/ua"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/opcua"
)
var _ opcua.Reader = (*reader)(nil)
type reader struct {
ctx context.Context
svc opcua.Service
logger logger.Logger
}
// NewReader returns new OPC-UA reader instance.
func NewReader(ctx context.Context, svc opcua.Service, log logger.Logger) opcua.Reader {
return reader{
ctx: ctx,
svc: svc,
logger: log,
}
}
// Read reads a given OPC-UA Server endpoint.
func (r reader) Read(cfg opcua.Config) error {
c := opcuaGopcua.NewClient(cfg.ServerURI, opcuaGopcua.SecurityMode(uaGopcua.MessageSecurityModeNone))
if err := c.Connect(r.ctx); err != nil {
return errors.Wrap(errFailedConn, err)
}
defer c.Close()
nid := fmt.Sprintf("ns=%s;%s=%s", cfg.NodeNamespace, cfg.NodeIdentifierType, cfg.NodeIdentifier)
id, err := uaGopcua.ParseNodeID(nid)
if err != nil {
return errors.Wrap(errFailedParseNodeID, err)
}
req := &uaGopcua.ReadRequest{
MaxAge: 2000,
NodesToRead: []*uaGopcua.ReadValueID{
&uaGopcua.ReadValueID{NodeID: id},
},
TimestampsToReturn: uaGopcua.TimestampsToReturnBoth,
}
resp, err := c.Read(req)
if err != nil {
return errors.Wrap(errFailedRead, err)
}
if resp.Results[0].Status != uaGopcua.StatusOK {
return errResponseStatus
}
// Publish on Mainflux NATS broker
msg := opcua.Message{
Namespace: cfg.NodeNamespace,
ID: cfg.NodeIdentifier,
Data: resp.Results[0].Value.Float(),
}
r.svc.Publish(r.ctx, "", msg)
return nil
}

View File

@ -5,7 +5,8 @@ package opcua
// Message represent an OPC-UA message
type Message struct {
Namespace string `json:"namespace"`
ID string `json:"id"`
Data interface{} `json:"data"`
ServerURI string
NodeID string
Type string
Data interface{}
}

View File

@ -1,10 +0,0 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package opcua
// Reader represents the OPC-UA client.
type Reader interface {
// Read given OPC-UA Server NodeID (Namespace + ID).
Read(Config) error
}

View File

@ -4,17 +4,22 @@
package redis
type createThingEvent struct {
id string
opcuaNodeIdentifier string
id string
opcuaNodeID string
}
type removeThingEvent struct {
id string
}
type connectThingEvent struct {
chanID string
thingID string
}
type createChannelEvent struct {
id string
opcuaNodeNamespace string
id string
opcuaServerURI string
}
type removeChannelEvent struct {

View File

@ -10,11 +10,6 @@ import (
"github.com/mainflux/mainflux/opcua"
)
const (
mfxMapPrefix = "mfx:opcua"
opcMapPrefix = "opcua:mfx"
)
var _ opcua.RouteMapRepository = (*routerMap)(nil)
type routerMap struct {
@ -30,12 +25,13 @@ func NewRouteMapRepository(client *redis.Client, prefix string) opcua.RouteMapRe
}
}
func (mr *routerMap) Save(mfxID, opcID string) error {
tkey := fmt.Sprintf("%s:%s:%s", mr.prefix, mfxMapPrefix, mfxID)
if err := mr.client.Set(tkey, opcID, 0).Err(); err != nil {
func (mr *routerMap) Save(mfxID, opcuaID string) error {
tkey := fmt.Sprintf("%s:%s", mr.prefix, mfxID)
if err := mr.client.Set(tkey, opcuaID, 0).Err(); err != nil {
return err
}
lkey := fmt.Sprintf("%s:%s:%s", mr.prefix, opcMapPrefix, opcID)
lkey := fmt.Sprintf("%s:%s", mr.prefix, opcuaID)
if err := mr.client.Set(lkey, mfxID, 0).Err(); err != nil {
return err
}
@ -43,8 +39,8 @@ func (mr *routerMap) Save(mfxID, opcID string) error {
return nil
}
func (mr *routerMap) Get(mfxID string) (string, error) {
lKey := fmt.Sprintf("%s:%s:%s", mr.prefix, opcMapPrefix, mfxID)
func (mr *routerMap) Get(opcuaID string) (string, error) {
lKey := fmt.Sprintf("%s:%s", mr.prefix, opcuaID)
mval, err := mr.client.Get(lKey).Result()
if err != nil {
return "", err
@ -54,12 +50,12 @@ func (mr *routerMap) Get(mfxID string) (string, error) {
}
func (mr *routerMap) Remove(mfxID string) error {
mkey := fmt.Sprintf("%s:%s:%s", mr.prefix, mfxMapPrefix, mfxID)
mkey := fmt.Sprintf("%s:%s", mr.prefix, mfxID)
lval, err := mr.client.Get(mkey).Result()
if err != nil {
return err
}
lkey := fmt.Sprintf("%s:%s:%s", mr.prefix, opcMapPrefix, lval)
lkey := fmt.Sprintf("%s:%s", mr.prefix, lval)
return mr.client.Del(mkey, lkey).Err()
}

View File

@ -14,17 +14,19 @@ import (
)
const (
keyProtocol = "opcua"
keyIdentifier = "identifier"
keyNamespace = "namespace"
keyProtocol = "opcua"
keyNodeID = "nodeID"
keyServerURI = "serverURI"
group = "mainflux.opcua"
stream = "mainflux.things"
thingPrefix = "thing."
thingCreate = thingPrefix + "create"
thingUpdate = thingPrefix + "update"
thingRemove = thingPrefix + "remove"
thingPrefix = "thing."
thingCreate = thingPrefix + "create"
thingUpdate = thingPrefix + "update"
thingRemove = thingPrefix + "remove"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"
channelPrefix = "channel."
channelCreate = channelPrefix + "create"
@ -39,9 +41,9 @@ var (
errMetadataFormat = errors.New("malformed metadata")
errMetadataNamespace = errors.New("Node Namespace not found in channel metadatada")
errMetadataServerURI = errors.New("ServerURI not found in channel metadatada")
errMetadataIdentifier = errors.New("Node Identifier not found in thing metadatada")
errMetadataNodeID = errors.New("NodeID not found in thing metadatada")
)
var _ opcua.EventStore = (*eventStore)(nil)
@ -119,6 +121,12 @@ func (es eventStore) Subscribe(subject string) error {
case channelRemove:
rce := decodeRemoveChannel(event)
err = es.handleRemoveChannel(rce)
case thingConnect:
rce := decodeConnectThing(event)
err = es.handleConnectThing(rce)
case thingDisconnect:
rce := decodeDisconnectThing(event)
err = es.handleDisconnectThing(rce)
}
if err != nil && err != errMetadataType {
es.logger.Warn(fmt.Sprintf("Failed to handle event sourcing: %s", err.Error()))
@ -150,12 +158,12 @@ func decodeCreateThing(event map[string]interface{}) (createThingEvent, error) {
return createThingEvent{}, errMetadataFormat
}
val, ok := metadataVal[keyIdentifier].(string)
val, ok := metadataVal[keyNodeID].(string)
if !ok || val == "" {
return createThingEvent{}, errMetadataIdentifier
return createThingEvent{}, errMetadataNodeID
}
cte.opcuaNodeIdentifier = val
cte.opcuaNodeID = val
return cte, nil
}
@ -186,12 +194,12 @@ func decodeCreateChannel(event map[string]interface{}) (createChannelEvent, erro
return createChannelEvent{}, errMetadataFormat
}
val, ok := metadataVal[keyNamespace].(string)
val, ok := metadataVal[keyServerURI].(string)
if !ok || val == "" {
return createChannelEvent{}, errMetadataNamespace
return createChannelEvent{}, errMetadataServerURI
}
cce.opcuaNodeNamespace = val
cce.opcuaServerURI = val
return cce, nil
}
@ -201,8 +209,22 @@ func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent {
}
}
func decodeConnectThing(event map[string]interface{}) connectThingEvent {
return connectThingEvent{
chanID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
}
}
func decodeDisconnectThing(event map[string]interface{}) connectThingEvent {
return connectThingEvent{
chanID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
}
}
func (es eventStore) handleCreateThing(cte createThingEvent) error {
return es.svc.CreateThing(cte.id, cte.opcuaNodeIdentifier)
return es.svc.CreateThing(cte.id, cte.opcuaNodeID)
}
func (es eventStore) handleRemoveThing(rte removeThingEvent) error {
@ -210,13 +232,21 @@ func (es eventStore) handleRemoveThing(rte removeThingEvent) error {
}
func (es eventStore) handleCreateChannel(cce createChannelEvent) error {
return es.svc.CreateChannel(cce.id, cce.opcuaNodeNamespace)
return es.svc.CreateChannel(cce.id, cce.opcuaServerURI)
}
func (es eventStore) handleRemoveChannel(rce removeChannelEvent) error {
return es.svc.RemoveChannel(rce.id)
}
func (es eventStore) handleConnectThing(rte connectThingEvent) error {
return es.svc.ConnectThing(rte.chanID, rte.thingID)
}
func (es eventStore) handleDisconnectThing(rte connectThingEvent) error {
return es.svc.DisconnectThing(rte.chanID, rte.thingID)
}
func read(event map[string]interface{}, key, def string) string {
val, ok := event[key].(string)
if !ok {

View File

@ -4,32 +4,21 @@
package opcua
import (
"context"
"errors"
"fmt"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
)
const (
protocol = "opcua"
thingSuffix = "thing"
channelSuffix = "channel"
)
const protocol = "opcua"
var (
// ErrMalformedIdentity indicates malformed identity received (e.g.
// invalid namespace or ID).
ErrMalformedIdentity = errors.New("malformed identity received")
// ErrMalformedMessage indicates malformed OPC-UA message.
ErrMalformedMessage = errors.New("malformed message received")
// ErrNotFoundIdentifier indicates a non-existent route map for a Node Identifier.
ErrNotFoundIdentifier = errors.New("route map not found for this node identifier")
// ErrNotFoundNamespace indicates a non-existent route map for an Node Namespace.
ErrNotFoundNamespace = errors.New("route map not found for this node namespace")
// ErrNotFoundServerURI indicates missing ServerURI route-map
ErrNotFoundServerURI = errors.New("route map not found for this Server URI")
// ErrNotFoundNodeID indicates missing NodeID route-map
ErrNotFoundNodeID = errors.New("route map not found for this Node ID")
// ErrNotFoundConn indicates missing connection
ErrNotFoundConn = errors.New("connection not found")
)
// Service specifies an API that must be fullfiled by the domain service
@ -44,96 +33,108 @@ type Service interface {
// RemoveThing removes thing mfx:opc & opc:mfx route-map
RemoveThing(string) error
// CreateChannel creates channel mfx:opc & opc:mfx route-map
// CreateChannel creates channel route-map
CreateChannel(string, string) error
// UpdateChannel updates mfx:opc & opc:mfx route-map
// UpdateChannel updates chroute-map
UpdateChannel(string, string) error
// RemoveChannel removes channel mfx:opc & opc:mfx route-map
// RemoveChannel removes channel route-map
RemoveChannel(string) error
// Publish forwards messages from the OPC-UA MQTT broker to Mainflux NATS broker
Publish(context.Context, string, Message) error
// ConnectThing creates thing and channel connection route-map
ConnectThing(string, string) error
// DisconnectThing removes thing and channel connection route-map
DisconnectThing(string, string) error
// Subscribe subscribes to a given OPC-UA server
Subscribe(Config) error
}
// Config OPC-UA Server
type Config struct {
ServerURI string
NodeNamespace string
NodeIdentifier string
NodeIdentifierType string
Policy string
Mode string
CertFile string
KeyFile string
ServerURI string
NodeID string
Policy string
Mode string
CertFile string
KeyFile string
}
var _ Service = (*adapterService)(nil)
type adapterService struct {
publisher mainflux.MessagePublisher
subscriber Subscriber
thingsRM RouteMapRepository
channelsRM RouteMapRepository
connectRM RouteMapRepository
cfg Config
logger logger.Logger
}
// New instantiates the OPC-UA adapter implementation.
func New(pub mainflux.MessagePublisher, thingsRM, channelsRM RouteMapRepository) Service {
func New(sub Subscriber, thingsRM, channelsRM, connectRM RouteMapRepository, cfg Config, log logger.Logger) Service {
return &adapterService{
publisher: pub,
subscriber: sub,
thingsRM: thingsRM,
channelsRM: channelsRM,
connectRM: connectRM,
cfg: cfg,
logger: log,
}
}
// Publish forwards messages from OPC-UA MQTT broker to Mainflux NATS broker
func (as *adapterService) Publish(ctx context.Context, token string, m Message) error {
// Get route map of OPC-UA Node Namespace
channelID, err := as.channelsRM.Get(m.Namespace)
if err != nil {
return ErrNotFoundNamespace
}
// Get route map of OPC-UA Node Identifier
thingID, err := as.thingsRM.Get(m.ID)
if err != nil {
return ErrNotFoundIdentifier
}
// Publish on Mainflux NATS broker
SenML := fmt.Sprintf(`[{"n":"opcua","v":%v}]`, m.Data)
payload := []byte(SenML)
msg := mainflux.Message{
Publisher: thingID,
Protocol: protocol,
ContentType: "Content-Type",
Channel: channelID,
Payload: payload,
}
return as.publisher.Publish(ctx, token, msg)
func (as *adapterService) CreateThing(mfxDevID, opcuaNodeID string) error {
return as.thingsRM.Save(mfxDevID, opcuaNodeID)
}
func (as *adapterService) CreateThing(mfxDevID string, opcID string) error {
return as.thingsRM.Save(mfxDevID, opcID)
}
func (as *adapterService) UpdateThing(mfxDevID string, opcID string) error {
return as.thingsRM.Save(mfxDevID, opcID)
func (as *adapterService) UpdateThing(mfxDevID, opcuaNodeID string) error {
return as.thingsRM.Save(mfxDevID, opcuaNodeID)
}
func (as *adapterService) RemoveThing(mfxDevID string) error {
return as.thingsRM.Remove(mfxDevID)
}
func (as *adapterService) CreateChannel(mfxChanID string, opcNamespace string) error {
return as.channelsRM.Save(mfxChanID, opcNamespace)
func (as *adapterService) CreateChannel(mfxChanID, opcuaServerURI string) error {
return as.channelsRM.Save(mfxChanID, opcuaServerURI)
}
func (as *adapterService) UpdateChannel(mfxChanID string, opcNamespace string) error {
return as.channelsRM.Save(mfxChanID, opcNamespace)
func (as *adapterService) UpdateChannel(mfxChanID, opcuaServerURI string) error {
return as.channelsRM.Save(mfxChanID, opcuaServerURI)
}
func (as *adapterService) RemoveChannel(mfxChanID string) error {
return as.channelsRM.Remove(mfxChanID)
}
func (as *adapterService) ConnectThing(mfxChanID, mfxThingID string) error {
serverURI, err := as.channelsRM.Get(mfxChanID)
if err != nil {
return err
}
nodeID, err := as.thingsRM.Get(mfxThingID)
if err != nil {
return err
}
as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
go as.subscriber.Subscribe(as.cfg)
c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID)
return as.connectRM.Save(c, c)
}
func (as *adapterService) DisconnectThing(mfxChanID, mfxThingID string) error {
c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID)
return as.connectRM.Remove(c)
}
// Subscribe subscribes to the OPC-UA Server.
func (as *adapterService) Subscribe(cfg Config) error {
go as.subscriber.Subscribe(cfg)
return nil
}