Skip to content

OPC UA 最佳实践

地址空间设计

层级结构规范

推荐的地址空间组织:

Root
└── Objects
    └── {CompanyName}
        └── {SystemType}
            └── {SiteID}
                └── {SubsystemType}
                    └── {DeviceID}
                        └── {DataPoint}

示例:
Root/Objects/ACME/ESS/Site001/BMS/Rack01/SOC

节点命名规范

python
# 节点命名最佳实践

# ✅ 推荐:清晰、一致、可扩展
"BMS_Rack01_SOC"
"PCS_Unit01_ActivePower_kW"
"Temperature_Cell_001_Celsius"

# ❌ 避免:缩写不明、特殊字符、空格
"bmsr1soc"
"PCS#1 Power"
"温度-电芯-001"

# 使用 BrowseName 和 DisplayName 分离
node.BrowseName = "SOC"  # 程序访问用
node.DisplayName = "State of Charge (%)"  # 人类可读

性能优化

批量读写

python
# ❌ 低效:逐个读取
async def bad_read(client, node_ids):
    values = []
    for node_id in node_ids:
        node = client.get_node(node_id)
        value = await node.read_value()
        values.append(value)
    return values

# ✅ 高效:批量读取
async def good_read(client, node_ids):
    nodes = [client.get_node(nid) for nid in node_ids]
    values = await client.read_values(nodes)
    return values

订阅优化

python
# 合理配置订阅参数
subscription = await client.create_subscription(
    period=1000,           # 发布间隔:1秒
    handler=handler,
    publishing_enabled=True,
    max_notifications=0,   # 0=无限制
    priority=0,
    lifetime_count=10000,  # 生命周期:10000 * period
    max_keepalive_count=3000
)

# 监控项采样间隔应 ≤ 订阅发布间隔
await subscription.subscribe_data_change(
    nodes,
    sampling_interval=500,  # 500ms 采样
    queuesize=10           # 队列大小:防止数据丢失
)

安全配置

证书生成

bash
# 生成 CA 证书
openssl genrsa -out ca_key.pem 4096
openssl req -new -x509 -days 3650 -key ca_key.pem -out ca_cert.pem \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=ACME/CN=ACME Root CA"

# 生成服务器证书
openssl genrsa -out server_key.pem 2048
openssl req -new -key server_key.pem -out server.csr \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=ACME/CN=ess-server"

# 签发服务器证书
openssl x509 -req -days 3650 -in server.csr \
  -CA ca_cert.pem -CAkey ca_key.pem -CAcreateserial \
  -out server_cert.pem \
  -extensions v3_req -extfile <(cat <<EOF
[v3_req]
subjectAltName = URI:urn:ess-server:opcua:server,DNS:ess-server,IP:192.168.1.100
EOF
)

# 生成客户端证书(同理)

用户认证

python
# 服务器端:配置用户认证
from asyncua.server.user_managers import UserManager

class CustomUserManager(UserManager):
    def check_user_token(self, isession, token):
        if isinstance(token, ua.UserNameIdentityToken):
            # 验证用户名密码
            if token.UserName == "admin" and token.Password == b"secret":
                return True
        return False

server.user_manager = CustomUserManager()

# 客户端:使用用户名密码
client.set_user("admin")
client.set_password("secret")

数据建模

自定义对象类型

python
# 定义 BatteryRack 对象类型
async def create_battery_rack_type(server, idx):
    base_object_type = server.get_node(ua.ObjectIds.BaseObjectType)

    rack_type = await base_object_type.add_object_type(idx, "BatteryRackType")

    # 添加强制属性
    await rack_type.add_variable(idx, "SOC", 0.0, ua.VariantType.Float)
    await rack_type.add_variable(idx, "Voltage", 0.0, ua.VariantType.Float)
    await rack_type.add_variable(idx, "Current", 0.0, ua.VariantType.Float)
    await rack_type.add_variable(idx, "Temperature_Max", 0.0, ua.VariantType.Float)

    return rack_type

# 实例化对象
rack_type = await create_battery_rack_type(server, idx)
rack01 = await bms.add_object(idx, "Rack01", objecttype=rack_type.nodeid)

故障恢复

python
class RobustOPCUAClient:
    def __init__(self, url):
        self.url = url
        self.client = None
        self.reconnect_delay = 5

    async def connect(self):
        while True:
            try:
                self.client = Client(self.url)
                await self.client.connect()
                logging.info("OPC UA connected")
                return
            except Exception as e:
                logging.error(f"Connect failed: {e}, retry in {self.reconnect_delay}s")
                await asyncio.sleep(self.reconnect_delay)

    async def read_with_retry(self, node_id, max_retries=3):
        for attempt in range(max_retries):
            try:
                node = self.client.get_node(node_id)
                return await node.read_value()
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                logging.warning(f"Read failed, retry {attempt+1}/{max_retries}")
                await asyncio.sleep(1)

褚成志的IoT笔记