Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion plugins/common/opcua/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package opcua

import (
"context"
"errors"
"fmt"
"log" //nolint:depguard // just for debug
"net/url"
Expand Down Expand Up @@ -184,7 +185,8 @@ type OpcUAClient struct {
Config *OpcUAClientConfig
Log telegraf.Logger

Client *opcua.Client
Client *opcua.Client
namespaceArray []string

opts []opcua.Option
codes []ua.StatusCode
Expand Down Expand Up @@ -327,3 +329,53 @@ func (o *OpcUAClient) State() ConnectionState {
}
return ConnectionState(o.Client.State())
}

// UpdateNamespaceArray fetches the namespace array from the OPC UA server
// The namespace array is stored at the well-known node ns=0;i=2255
func (o *OpcUAClient) UpdateNamespaceArray(ctx context.Context) error {
if o.Client == nil {
return errors.New("client not connected")
}

nodeID := ua.NewNumericNodeID(0, 2255)
req := &ua.ReadRequest{
MaxAge: 2000,
NodesToRead: []*ua.ReadValueID{
{NodeID: nodeID},
},
TimestampsToReturn: ua.TimestampsToReturnBoth,
}

resp, err := o.Client.Read(ctx, req)
if err != nil {
return fmt.Errorf("failed to read namespace array: %w", err)
}

if len(resp.Results) == 0 {
return errors.New("no results returned when reading namespace array")
}

result := resp.Results[0]
if result.Status != ua.StatusOK {
return fmt.Errorf("failed to read namespace array, status: %w", result.Status)
}

if result.Value == nil {
return errors.New("namespace array value is nil")
}

// The namespace array is an array of strings
namespaces, ok := result.Value.Value().([]string)
if !ok {
return fmt.Errorf("namespace array is not a string array, got type: %T", result.Value.Value())
}

o.namespaceArray = namespaces
o.Log.Debugf("Fetched namespace array with %d entries", len(namespaces))
return nil
}

// NamespaceArray returns the cached namespace array
func (o *OpcUAClient) NamespaceArray() []string {
return o.namespaceArray
}
130 changes: 114 additions & 16 deletions plugins/common/opcua/input/input_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type MonitoringParameters struct {
type NodeSettings struct {
FieldName string `toml:"name"`
Namespace string `toml:"namespace"`
NamespaceURI string `toml:"namespace_uri"`
IdentifierType string `toml:"identifier_type"`
Identifier string `toml:"identifier"`
DefaultTags map[string]string `toml:"default_tags"`
Expand All @@ -59,13 +60,17 @@ type NodeSettings struct {

// NodeID returns the OPC UA node id
func (tag *NodeSettings) NodeID() string {
if tag.NamespaceURI != "" {
return "nsu=" + tag.NamespaceURI + ";" + tag.IdentifierType + "=" + tag.Identifier
}
return "ns=" + tag.Namespace + ";" + tag.IdentifierType + "=" + tag.Identifier
}

// NodeGroupSettings describes a mapping of group of nodes to Metrics
type NodeGroupSettings struct {
MetricName string `toml:"name"` // Overrides plugin's setting
Namespace string `toml:"namespace"` // Can be overridden by node setting
NamespaceURI string `toml:"namespace_uri"` // Can be overridden by node setting
IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting
Nodes []NodeSettings `toml:"nodes"`
DefaultTags map[string]string `toml:"default_tags"`
Expand All @@ -74,11 +79,15 @@ type NodeGroupSettings struct {

type EventNodeSettings struct {
Namespace string `toml:"namespace"`
NamespaceURI string `toml:"namespace_uri"`
IdentifierType string `toml:"identifier_type"`
Identifier string `toml:"identifier"`
}

func (e *EventNodeSettings) NodeID() string {
if e.NamespaceURI != "" {
return "nsu=" + e.NamespaceURI + ";" + e.IdentifierType + "=" + e.Identifier
}
return "ns=" + e.Namespace + ";" + e.IdentifierType + "=" + e.Identifier
}

Expand All @@ -87,6 +96,7 @@ type EventGroupSettings struct {
QueueSize uint32 `toml:"queue_size"`
EventTypeNode EventNodeSettings `toml:"event_type_node"`
Namespace string `toml:"namespace"`
NamespaceURI string `toml:"namespace_uri"`
IdentifierType string `toml:"identifier_type"`
NodeIDSettings []EventNodeSettings `toml:"node_ids"`
SourceNames []string `toml:"source_names"`
Expand All @@ -99,6 +109,9 @@ func (e *EventGroupSettings) UpdateNodeIDSettings() {
if n.Namespace == "" {
n.Namespace = e.Namespace
}
if n.NamespaceURI == "" {
n.NamespaceURI = e.NamespaceURI
}
if n.IdentifierType == "" {
n.IdentifierType = e.IdentifierType
}
Expand Down Expand Up @@ -138,11 +151,23 @@ func (e EventNodeSettings) validateEventNodeSettings() error {
}
if e.Identifier == "" {
return errors.New("identifier must be set")
} else if e.IdentifierType == "" {
}
if e.IdentifierType == "" {
return errors.New("identifier_type must be set")
} else if e.Namespace == "" {
return errors.New("namespace must be set")
}

// Validate namespace configuration
hasNamespace := len(e.Namespace) > 0
hasNamespaceURI := len(e.NamespaceURI) > 0

if hasNamespace && hasNamespaceURI {
return errors.New("cannot specify both 'namespace' and 'namespace_uri', use only one")
}

if !hasNamespace && !hasNamespaceURI {
return errors.New("must specify either 'namespace' or 'namespace_uri'")
}

return nil
}

Expand Down Expand Up @@ -334,8 +359,16 @@ func validateNodeToAdd(existing map[metricParts]struct{}, nmm *NodeMetricMapping
return fmt.Errorf("empty name in %q", nmm.Tag.FieldName)
}

if len(nmm.Tag.Namespace) == 0 {
return errors.New("empty node namespace not allowed")
// Validate namespace configuration
hasNamespace := len(nmm.Tag.Namespace) > 0
hasNamespaceURI := len(nmm.Tag.NamespaceURI) > 0

if hasNamespace && hasNamespaceURI {
return fmt.Errorf("node %q: cannot specify both 'namespace' and 'namespace_uri', use only one", nmm.Tag.FieldName)
}

if !hasNamespace && !hasNamespaceURI {
return fmt.Errorf("node %q: must specify either 'namespace' or 'namespace_uri'", nmm.Tag.FieldName)
}

if len(nmm.Tag.Identifier) == 0 {
Expand Down Expand Up @@ -396,6 +429,9 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error {
if node.Namespace == "" {
node.Namespace = group.Namespace
}
if node.NamespaceURI == "" {
node.NamespaceURI = group.NamespaceURI
}
if node.IdentifierType == "" {
node.IdentifierType = group.IdentifierType
}
Expand All @@ -420,29 +456,91 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error {

func (o *OpcUAInputClient) InitNodeIDs() error {
o.NodeIDs = make([]*ua.NodeID, 0, len(o.NodeMetricMapping))
namespaceArray := o.NamespaceArray()

for _, node := range o.NodeMetricMapping {
nid, err := ua.ParseNodeID(node.Tag.NodeID())
if err != nil {
return err
nodeIDStr := node.Tag.NodeID()

// Check if this uses namespace URI (nsu=) format
if strings.HasPrefix(nodeIDStr, "nsu=") {
// Namespace URI format requires namespace array
if len(namespaceArray) == 0 {
return fmt.Errorf("node ID %q uses namespace URI (nsu=) but namespace array is not available - connection to server may be required", nodeIDStr)
}
// Use ParseExpandedNodeID for namespace URI support
expandedNodeID, err := ua.ParseExpandedNodeID(nodeIDStr, namespaceArray)
if err != nil {
return fmt.Errorf("failed to parse node ID %q: %w", nodeIDStr, err)
}
o.NodeIDs = append(o.NodeIDs, expandedNodeID.NodeID)
} else {
// Use ParseNodeID for namespace index (ns=) format
nid, err := ua.ParseNodeID(nodeIDStr)
if err != nil {
return fmt.Errorf("failed to parse node ID %q: %w", nodeIDStr, err)
}
o.NodeIDs = append(o.NodeIDs, nid)
}
o.NodeIDs = append(o.NodeIDs, nid)
}

return nil
}

func (o *OpcUAInputClient) InitEventNodeIDs() error {
namespaceArray := o.NamespaceArray()

for _, eventSetting := range o.EventGroups {
eid, err := ua.ParseNodeID(eventSetting.EventTypeNode.NodeID())
if err != nil {
return err
eventTypeNodeIDStr := eventSetting.EventTypeNode.NodeID()
var eid *ua.NodeID

// Parse event type node ID
if strings.HasPrefix(eventTypeNodeIDStr, "nsu=") {
if len(namespaceArray) == 0 {
return fmt.Errorf(
"event type node ID %q uses namespace URI (nsu=) but namespace array is not available - "+
"connection to server may be required",
eventTypeNodeIDStr,
)
}
expandedNodeID, err := ua.ParseExpandedNodeID(eventTypeNodeIDStr, namespaceArray)
if err != nil {
return fmt.Errorf("failed to parse event type node ID %q: %w", eventTypeNodeIDStr, err)
}
eid = expandedNodeID.NodeID
} else {
parsedID, err := ua.ParseNodeID(eventTypeNodeIDStr)
if err != nil {
return fmt.Errorf("failed to parse event type node ID %q: %w", eventTypeNodeIDStr, err)
}
eid = parsedID
}
for _, node := range eventSetting.NodeIDSettings {
nid, err := ua.ParseNodeID(node.NodeID())

if err != nil {
return err
for _, node := range eventSetting.NodeIDSettings {
nodeIDStr := node.NodeID()
var nid *ua.NodeID

// Parse node ID
if strings.HasPrefix(nodeIDStr, "nsu=") {
if len(namespaceArray) == 0 {
return fmt.Errorf(
"event node ID %q uses namespace URI (nsu=) but namespace array is not available - "+
"connection to server may be required",
nodeIDStr,
)
}
expandedNodeID, err := ua.ParseExpandedNodeID(nodeIDStr, namespaceArray)
if err != nil {
return fmt.Errorf("failed to parse node ID %q: %w", nodeIDStr, err)
}
nid = expandedNodeID.NodeID
} else {
parsedID, err := ua.ParseNodeID(nodeIDStr)
if err != nil {
return fmt.Errorf("failed to parse node ID %q: %w", nodeIDStr, err)
}
nid = parsedID
}

nmm := EventNodeMetricMapping{
NodeID: nid,
SamplingInterval: &eventSetting.SamplingInterval,
Expand Down
Loading
Loading