Skip to content

OPC UA/DA 原理与生态

OPC 协议演进

OPC(OLE for Process Control)是工业自动化领域的标准通信接口,由 OPC 基金会维护。

OPC 协议演进:

OPC Classic(1996-2006):
├── OPC DA(Data Access)    → 实时数据读写
├── OPC HDA(Historical Data Access)→ 历史数据
├── OPC A&E(Alarms & Events)→ 报警事件
└── 基于 DCOM/COM,仅支持 Windows

OPC UA(Unified Architecture,2008-至今):
├── 跨平台(Windows/Linux/嵌入式)
├── 内置安全(TLS + 证书认证)
├── 信息模型(面向对象,可扩展)
├── 多种传输(TCP/HTTPS/WebSocket)
└── 向下兼容 OPC Classic(通过 Wrapper)

OPC UA 架构

OPC UA 架构层次:

应用层
├── 信息模型(Information Model)
│   ├── 节点(Node):变量、对象、方法、视图
│   ├── 引用(Reference):节点间关系
│   └── 地址空间(Address Space):节点树

├── 服务集(Service Sets)
│   ├── 发现服务(Discovery)
│   ├── 会话服务(Session)
│   ├── 节点管理(NodeManagement)
│   ├── 读写服务(Read/Write)
│   ├── 订阅服务(Subscription)
│   └── 方法调用(Method)

传输层
├── OPC UA TCP(opc.tcp://)→ 二进制编码,高效
├── HTTPS(https://)→ 基于 HTTP,防火墙友好
└── WebSocket(opc.wss://)→ 浏览器支持

信息模型

节点类型

OPC UA 节点类(NodeClass):

Object(对象):
  代表现实世界的实体,如设备、系统
  例:BMS_Rack01, PCS_Unit01

Variable(变量):
  包含数据值,有 DataType、Value、StatusCode、Timestamps
  例:BMS_Rack01.SOC, PCS_Unit01.ActivePower

Method(方法):
  可调用的操作
  例:PCS_Unit01.Start(), PCS_Unit01.SetPower(value)

ObjectType(对象类型):
  对象的模板/类定义
  例:BatteryRackType, PCSType

VariableType(变量类型):
  变量的类型定义

DataType(数据类型):
  Boolean, Int16, Int32, Float, Double, String, DateTime...

View(视图):
  地址空间的子集,用于特定用途的过滤视图

地址空间示例

Root
└── Objects
    └── EnergyStorageSystem
        ├── Site001
        │   ├── BMS
        │   │   ├── Rack001
        │   │   │   ├── SOC: Float = 85.3
        │   │   │   ├── Voltage: Float = 384.5
        │   │   │   ├── Current: Float = -50.2
        │   │   │   ├── Temperature_Max: Float = 32.1
        │   │   │   └── Cells[1..96]
        │   │   │       └── Cell001
        │   │   │           ├── Voltage: Float = 4.012
        │   │   │           └── Temperature: Float = 31.8
        │   │   └── Rack002
        │   │       └── ...
        │   └── PCS
        │       ├── Unit001
        │       │   ├── ActivePower: Float = -100.0
        │       │   ├── ReactivePower: Float = 0.0
        │       │   ├── Status: Int32 = 2
        │       │   ├── Start: Method
        │       │   ├── Stop: Method
        │       │   └── SetActivePower: Method(value: Float)
        │       └── Unit002
        └── Site002

订阅与监控

订阅机制

python
from asyncua import Client, ua
import asyncio

async def subscribe_to_ess_data():
    async with Client("opc.tcp://ess-server:4840") as client:

        # 创建订阅(发布间隔 1000ms)
        subscription = await client.create_subscription(
            period=1000,  # 毫秒
            handler=DataChangeHandler()
        )

        # 监控节点列表
        nodes_to_monitor = [
            client.get_node("ns=2;s=Site001.BMS.Rack001.SOC"),
            client.get_node("ns=2;s=Site001.BMS.Rack001.Voltage"),
            client.get_node("ns=2;s=Site001.PCS.Unit001.ActivePower"),
        ]

        # 创建监控项(采样间隔 500ms,队列大小 10)
        handles = await subscription.subscribe_data_change(
            nodes_to_monitor,
            attr=ua.AttributeIds.Value,
            queuesize=10
        )

        # 保持运行
        await asyncio.sleep(3600)

        # 清理
        await subscription.unsubscribe(handles)
        await subscription.delete()

class DataChangeHandler:
    def datachange_notification(self, node, val, data):
        print(f"Node: {node}, Value: {val}, "
              f"Status: {data.monitored_item.Value.StatusCode}, "
              f"Timestamp: {data.monitored_item.Value.SourceTimestamp}")

    def event_notification(self, event):
        print(f"Event: {event}")

历史数据读取

python
async def read_historical_data(client, node_id: str,
                                start_time, end_time,
                                max_values: int = 1000):
    node = client.get_node(node_id)

    # 读取历史数据
    history = await node.read_raw_history(
        starttime=start_time,
        endtime=end_time,
        numvalues=max_values
    )

    results = []
    for data_value in history:
        results.append({
            'timestamp': data_value.SourceTimestamp,
            'value': data_value.Value.Value,
            'status': str(data_value.StatusCode)
        })

    return results

安全机制

安全模式

OPC UA 安全策略:

None(无安全):
  - 无加密,无签名
  - 仅用于测试/内网隔离环境

Basic256Sha256(推荐):
  - 消息签名:SHA-256
  - 消息加密:AES-256-CBC
  - 密钥交换:RSA-2048

Aes128_Sha256_RsaOaep:
  - 更现代的加密套件

安全模式:
  None:无保护
  Sign:仅签名,不加密
  SignAndEncrypt:签名+加密(推荐生产环境)
python
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua import Client
import asyncio

async def connect_secure():
    client = Client("opc.tcp://ess-server:4840")

    # 配置安全策略
    await client.set_security(
        SecurityPolicyBasic256Sha256,
        certificate="client_cert.pem",
        private_key="client_key.pem",
        server_certificate="server_cert.pem",
        mode=ua.MessageSecurityMode.SignAndEncrypt
    )

    await client.connect()
    return client

OPC UA 服务器实现

python
from asyncua import Server, ua
from asyncua.common.methods import uamethod
import asyncio

async def create_ess_server():
    server = Server()
    await server.init()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/ess")
    server.set_server_name("Energy Storage System OPC UA Server")

    # 注册命名空间
    uri = "http://example.com/ess"
    idx = await server.register_namespace(uri)

    # 创建对象结构
    objects = server.get_objects_node()
    ess = await objects.add_object(idx, "EnergyStorageSystem")
    site = await ess.add_object(idx, "Site001")
    bms = await site.add_object(idx, "BMS")
    rack = await bms.add_object(idx, "Rack001")

    # 添加变量
    soc_var = await rack.add_variable(idx, "SOC", 0.0)
    await soc_var.set_writable(False)  # 只读

    voltage_var = await rack.add_variable(idx, "Voltage", 0.0)

    # 添加方法
    pcs = await site.add_object(idx, "PCS")
    unit = await pcs.add_object(idx, "Unit001")

    @uamethod
    async def set_active_power(parent, power: float) -> bool:
        """设置有功功率"""
        if -1000 <= power <= 1000:
            # 执行实际控制...
            return True
        return False

    await unit.add_method(
        idx, "SetActivePower",
        set_active_power,
        [ua.VariantType.Float],   # 输入参数类型
        [ua.VariantType.Boolean]  # 输出参数类型
    )

    async with server:
        # 定期更新数据
        while True:
            soc = read_bms_soc()
            await soc_var.write_value(soc)
            await asyncio.sleep(1)

与 MQTT 集成(OPC UA PubSub)

OPC UA PubSub(OPC UA 1.04 新增):

传统 Client-Server 模式:
  Client ──请求──► Server ──响应──► Client

PubSub 模式(类似 MQTT):
  Publisher ──发布──► Message Broker ──推送──► Subscriber

支持的传输:
├── MQTT(最常用)
├── AMQP
└── UDP 组播

消息格式:
├── JSON(可读性好)
└── UADP(二进制,高效)
python
# OPC UA over MQTT 配置示例
# 将 OPC UA 数据发布到 MQTT Broker

from asyncua.pubsub import PubSubManager

pubsub = PubSubManager(server)
connection = pubsub.add_connection(
    "mqtt://broker:1883",
    transport="mqtt"
)

writer_group = connection.add_writer_group(
    publishing_interval=1000  # 1秒发布间隔
)

dataset_writer = writer_group.add_dataset_writer(
    dataset_name="BMS_Data",
    topic="ess/site001/bms/data"
)

# 添加要发布的变量
dataset_writer.add_variable(soc_var)
dataset_writer.add_variable(voltage_var)

生态工具

开发库:
├── Python: asyncua(推荐), opcua
├── Java:   Eclipse Milo(推荐), Prosys SDK
├── C/C++:  open62541(开源,嵌入式友好)
├── .NET:   OPC Foundation SDK, UA-.NETStandard
└── Go:     gopcua

客户端工具:
├── UaExpert        → 免费 GUI 客户端(推荐)
├── Prosys OPC UA Browser → 功能完整
├── FreeOpcUa       → 开源 Python 工具
└── opcua-commander → 命令行工具

服务器/网关:
├── Kepware KEPServerEX → 工业协议网关(商业)
├── Ignition           → 工业 SCADA 平台
├── Node-RED           → node-red-contrib-opcua
├── Neuron             → 开源工业协议网关
└── open62541-server   → 嵌入式 OPC UA 服务器

褚成志的IoT笔记