OPC-UA 与 Python 在数组中使用结构时遇到麻烦

问题描述 投票:0回答:1

我在Python中使用一个简单的OPC-UA服务器,我的麻烦是我需要读取一个变量“astParam”,它是一个结构ST_NameValue的数组[20],以便在我的OPC-UA方法中使用它。我真的不知道如何创建这样的 OPC-UA 变量。

import time
import subprocess
import sys
sys.path.insert(0, "..")
from opcua import ua, Server, uamethod
import threading

# Définir la structure ST_NameValue
class ST_NameValue:
    def __init__(self, name="", value=""):
        self.Name = name
        self.Value = value

def lire_adc():
    return 100

def set_line_state(gpiochip, toto, tata):
    print("set_line_state called")

def indexeurMoveDown(choixIndexeur):
    valeur_adc = lire_adc()
    print("Valeur ADC avant déplacement: ", valeur_adc)
    
    match choixIndexeur:
        case 1: #indexeur 1
            gpiochipNbrSens1 = "gpiochip2"
            lineNbrSens1 = 15
            gpiochipNbrSens2 = "gpiochip2"
            lineNbrSens2 = 3 
        case 2: #indexeur 2
            gpiochipNbrSens1 = "gpiochip3" 
            lineNbrSens1 = 25
            gpiochipNbrSens2 = "gpiochip0" 
            lineNbrSens2 = 11 
        case 3: #indexeur 3
            gpiochipNbrSens1 = "gpiochip0" 
            lineNbrSens1 = 8
            gpiochipNbrSens2 = "gpiochip0" 
            lineNbrSens2 = 9
           
    #Sens d'avance
    set_line_state(gpiochipNbrSens1, lineNbrSens1, 1) #exemple avec indexeur 1 : GPIO_1_P_3.3V -> enable sens 1
    set_line_state(gpiochipNbrSens2, lineNbrSens2, 0) #exemple avec indexeur 1 : GPIO_1_N_3.3V -> disable sens 2

    time.sleep(0.5) #simu déplacement
    while valeur_adc > 120:
        valeur_adc = lire_adc()
    set_line_state(gpiochipNbrSens1, lineNbrSens1, 0)
    print("Valeur ADC après déplacement: ", valeur_adc)

def indexeurMoveUp(choixIndexeur):
    valeur_adc = lire_adc()
    print("Valeur ADC avant déplacement: ", valeur_adc)

    match choixIndexeur:
        case 1: #indexeur 1
            gpiochipNbrSens1 = 2
            lineNbrSens1 = 15
            gpiochipNbrSens2 = 2
            lineNbrSens2 = 3 
        case 2: #indexeur 2
            gpiochipNbrSens1 = 3
            lineNbrSens1 = 25
            gpiochipNbrSens2 = 0
            lineNbrSens2 = 11 
        case 3: #indexeur 3
            gpiochipNbrSens1 = 0
            lineNbrSens1 = 8
            gpiochipNbrSens2 = 0
            lineNbrSens2 = 9 

    #Sens d'avance
    set_line_state(gpiochipNbrSens1, lineNbrSens1, 0) #exemple avec indexeur 1 : GPIO_1_P_3.3V -> disable sens 1
    set_line_state(gpiochipNbrSens2, lineNbrSens2, 1) #exemple avec indexeur 1 : GPIO_1_N_3.3V -> enable sens 2

    while valeur_adc < 1560:
        valeur_adc = lire_adc()
    set_line_state(gpiochipNbrSens2, lineNbrSens2, 0)

    print("Valeur ADC après déplacement: ", valeur_adc)
    
@uamethod
def mAckError(parent):
    print("mAckError called")

    xError_var.set_value(False)
    return "xAckError"

@uamethod
def mInitialize(parent):
    print("mInitialize called")
    # ici il faut descendre l'indexeur car un index vide est à l'état bas.
    # il faut aussi init toutes les variables opc ua
    xBusy_var.set_value(False)
    xReady_var.set_value(True)
    xError_var.set_value(False)
    thread = threading.Thread(target=thread_mInitialize, args=(parent,))
    thread.start()
    return "xAckInit"

def thread_mInitialize(parent): #permet de mettre l'indexeur en position basse
    xBusy_var.set_value(True)
    xReady_var.set_value(False)

    #indexeurMoveDown("all") à revoir car il faut envoyer tout les indexeurs vers le bas

    xBusy_var.set_value(False)
    xReady_var.set_value(True)
     
@uamethod
def mReset(parent):
    print("mReset called")
@uamethod
def mSafetyStop(parent):
    print("mSafetyStop called")

@uamethod
def mRetractUnload(parent):
    print("mRetractUnload called")
    return "xAckRetractUnload"

@uamethod
def mUnloadElement(parent):
    print("mLoadElement called")
    #ici il faudra en input pouvoir préciser quel indexeurs il faut déplacer (1 à 3)
    thread = threading.Thread(target=thread_unLoadElement, args=(parent,))
    thread.start()
    return "xAckUnloadElement"

def thread_unLoadElement(parent):
    xBusy_var.set_value(True)
    xReady_var.set_value(False)

    astParams_node_id = ua.NodeId("astParams", 10)
    astParams_var = server.get_node(astParams_node_id)
    astParams_array = astParams_var.get_value()
    print("astParams: "+str(astParams_array))
    # Parcourir le tableau pour trouver l'élément correspondant au nom choixIndexeur
    for item in astParams_array:
        if item.name == "choixIndexeur":
            choixIndexeur = item.value
            break

    indexeurMoveDown(choixIndexeur)

    xBusy_var.set_value(False)
    xReady_var.set_value(True)

@uamethod
def mLoadElement(parent):
    print("mUnloadElement called")
    thread = threading.Thread(target=thread_LoadElement, args=(parent,))
    thread.start()
    
    return "xAckLoadElement"

def thread_LoadElement(parent):
    xBusy_var.set_value(True)
    xReady_var.set_value(False)

    astParams_node_id = ua.NodeId("astParams", 10)
    astParams_var = server.get_node(astParams_node_id)
    astParams_array = astParams_var.get_value()
    print("astParams: "+str(astParams_array))
    # Parcourir le tableau pour trouver l'élément correspondant au nom choixIndexeur
    for item in astParams_array:
        if item.name == "choixIndexeur":
            choixIndexeur = item.value
            break
    if(lire_adc > 150): #valider que l'indexeur est bien en position basse
        indexeurMoveDown(choixIndexeur)

    xBusy_var.set_value(False)
    xReady_var.set_value(True)

@uamethod
def mRetractElement(parent):
    print("mRetractElement called")
    
    thread = threading.Thread(target=thread_RetractElement, args=(parent,))
    thread.start()

    return "xAckRetractElement"   
 
def thread_RetractElement(parent):
    xBusy_var.set_value(True)
    xReady_var.set_value(False)

    astParams_node_id = ua.NodeId("astParams", 10)
    astParams_var = server.get_node(astParams_node_id)
    astParams_array = astParams_var.get_value()
    print("astParams: "+str(astParams_array))
    # Parcourir le tableau pour trouver l'élément correspondant au nom choixIndexeur
    for item in astParams_array:
        if item.name == "choixIndexeur":
            choixIndexeur = item.value
            break
   
    indexeurMoveUp(choixIndexeur)

    xBusy_var.set_value(False)
    xReady_var.set_value(True)

if __name__ == "__main__":
    print("Début du programme")
 
    #OPC-UA SERVER
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

    base = server.get_objects_node()
    base.add_method(1, "mLoadElement", mLoadElement, [], [ua.VariantType.String])
    base.add_method(2, "mUnloadElement",mUnloadElement,[],[ua.VariantType.String])
    base.add_method(3, "mAckError", mAckError,[],[ua.VariantType.String])
    base.add_method(4, "mInitialize", mInitialize,[],[ua.VariantType.String])
    base.add_method(5, "mReset", mReset,[],[])
    base.add_method(6, "mSafetyStop", mSafetyStop,[],[])
    base.add_method(1, "mRetractUnload", mRetractUnload, [], [ua.VariantType.String])
    base.add_method(1, "mRetractElement", mRetractElement, [], [ua.VariantType.String])

    xBusy_var = base.add_variable(ua.NodeId("xBusy", 7), "xBusy", False)
    xReady_var = base.add_variable(ua.NodeId("xReady", 8), "xReady", True)
    xError_var = base.add_variable(ua.NodeId("xError", 9), "xError", False)

    initial_values = [ST_NameValue("nameTest","valueTest") for _ in range(20)]
    astParams_var = base.add_variable(ua.NodeId("astParams", 10), "astParams", initial_values,varianttype=ua.VariantType.ExtensionObject)
    astParams_var.set_array_dimensions([20])

    server.start()

这是我得到的错误:

Traceback (most recent call last):
  File "/home/dev/opcuaServer.py", line 227, in <module>
    astParams_var = base.add_variable(ua.NodeId("astParams", 10), "astParams", initial_values,varianttype=ua.VariantType.ExtensionObject)
  File "/home/.local/lib/python3.10/site-packages/opcua/common/node.py", line 664, in add_variable
    return opcua.common.manage_nodes.create_variable(self, nodeid, bname, val, varianttype, datatype)
  File "/home/.local/lib/python3.10/site-packages/opcua/common/manage_nodes.py", line 89, in create_variable
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
  File "/home/.local/lib/python3.10/site-packages/opcua/common/manage_nodes.py", line 232, in _create_variable
    attrs.DataType = _guess_datatype(var)
  File "/home/.local/lib/python3.10/site-packages/opcua/common/manage_nodes.py", line 370, in _guess_datatype
    return ua.NodeId(getattr(ua.ObjectIds, classname))
AttributeError: type object 'ObjectIds' has no attribute 'ST_NameValue'

我尝试创建一个 int 数组,然后使用测试客户端将我的 Struct 的一些值放入其中,例如:

     toto = ST_NameValue("choixIndexeur", 1)
        astParams_node_id = ua.NodeId("astParams", 10)
        astParams_var = client.get_node(astParams_node_id)
        astParams_var.set_value([toto])

但我有以下错误:

Traceback (most recent call last):
  File "/home/dev/opcuaClient.py", line 36, in <module>
    astParams_var.set_value([toto])
  File "/home/.local/lib/python3.10/site-packages/opcua/common/node.py", line 217, in set_value
    self.set_attribute(ua.AttributeIds.Value, datavalue)
  File "/home/.local/lib/python3.10/site-packages/opcua/common/node.py", line 262, in set_attribute
    result = self.server.write(params)
  File "/home/.local/lib/python3.10/site-packages/opcua/client/ua_client.py", line 367, in write
    data = self._uasocket.send_request(request)
  File "/home/.local/lib/python3.10/site-packages/opcua/client/ua_client.py", line 81, in send_request
    future = self._send_request(request, callback, timeout, message_type)
  File "/home/.local/lib/python3.10/site-packages/opcua/client/ua_client.py", line 55, in _send_request
    binreq = struct_to_binary(request)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 261, in struct_to_binary
    packet.append(to_binary(uatype, val))
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 284, in to_binary
    return struct_to_binary(val)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 256, in struct_to_binary
    packet.append(list_to_binary(uatype[6:], val))
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 296, in list_to_binary
    pack = [to_binary(uatype, el) for el in val]
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 296, in <listcomp>
    pack = [to_binary(uatype, el) for el in val]
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 284, in to_binary
    return struct_to_binary(val)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 261, in struct_to_binary
    packet.append(to_binary(uatype, val))
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 274, in to_binary
    return pack_uatype(vtype, val)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 194, in pack_uatype
    return struct_to_binary(value)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 261, in struct_to_binary
    packet.append(to_binary(uatype, val))
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 274, in to_binary
    return pack_uatype(vtype, val)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 192, in pack_uatype
    return variant_to_binary(value)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 373, in variant_to_binary
    b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 224, in pack_uatype_array
    b = [pack_uatype(vtype, val) for val in array]
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 224, in <listcomp>
    b = [pack_uatype(vtype, val) for val in array]
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 188, in pack_uatype
    return extensionobject_to_binary(value)
  File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 458, in extensionobject_to_binary
    TypeId = ua.extension_object_ids[obj.__class__.__name__]
KeyError: 'ST_NameValue'
python opc-ua
1个回答
0
投票

不能使用Python结构。您必须创建特定的 OPC UA 数据类型:

  snode1, _ = await new_struct(server, idx, "ST_NameValue", [
        new_struct_field("Name", ua.VariantType.String),
        new_struct_field("Value", ua.VariantType.String),
    ])

要使用它们,请使用

ua.ST_NameValue
:

initial_values = [ua.ST_NameValue("nameTest","valueTest") for _ in range(20)]
astParams_var = base.add_variable(ua.NodeId("astParams", 10), "astParams", initial_values,varianttype=ua.VariantType.ExtensionObject)

有关更完整的示例,请参阅 https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/server-custom-structs-and-enums.py

© www.soinside.com 2019 - 2024. All rights reserved.