线程中的 Python asyncua 获取任何值都会延迟

问题描述 投票:0回答:0

我想使用 asyncua 订阅一些变量。 我使用线程是因为我需要每隔一段时间动态更新订阅列表。 但是当我收到数据更改通知并尝试读取节点浏览名称时,它会延迟大约 2 秒。 我如何解决它? 这是我的代码。

import asyncio

from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
import pymssql
from threading import Thread

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


class MyHandler(SubHandler):
    n = 1
    def __init__(self, variables_len=0):
        self._queue = asyncio.Queue()
        self.variables_len = variables_len
        conn = pymssql.connect(server='(local)', user='sa',
                               password='sa', database='AdventureWorks2005')
        self.cursor = conn.cursor()

    def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
        if self.n > self.variables_len:
            self._queue.put_nowait([node, value, data])
            print(f'Data change notification was received and queued.')
            self.n = self.variables_len+1
        self.n += 1

    async def process(self) -> None:
        try:
            while True:
                # Get data in a queue.
                [node, value, data] = self._queue.get_nowait()

                # *** Write your processing code ***
                print(await node.read_browse_name())   #delay here
                self.cursor.execute('select TOP(1) * from tool_list')  #test code
                row = self.cursor.fetchall()
                print(row)

        except asyncio.QueueEmpty:
            pass


async def main() -> None:
    async with Client(url=ENDPOINT) as client:
        # Get a variable node.
        idx = await client.get_namespace_index(NAMESPACE)
        obj = await client.get_objects_node().get_child([f'{idx}:MyObject'])
        variables = await obj.get_variables()
        # Subscribe data change.
        handler = MyHandler(variables_len=len(variables))
        subscription = await client.create_subscription(period=0, handler=handler)
        handle = await subscription.subscribe_data_change(variables)


        # Process data change every 100ms
        async def insert_to_sql():
            while True:
                await handler.process()
                #service_level = await client.get_node('i=2267').get_value()
                await asyncio.sleep(0.1)

        def between_callback():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop.run_until_complete(some_callback())
            loop.close()

        _thread = Thread(target=between_callback)
        _thread.daemon = True
        _thread.start()

        while True:
            # Get a variable node.
            idx = await client.get_namespace_index(NAMESPACE)
            obj = await client.get_objects_node().get_child([f'{idx}:MyObject'])
            variables_new = await obj.get_variables()
            if variables != variables_new:
                handler.variables_len = len(variables_new)
                handler.n = 1
                await subscription.unsubscribe(handle)
                handle = await subscription.subscribe_data_change(variables_new)
                variables = variables_new
            print('update done')
        await asyncio.sleep(100)



if __name__ == '__main__':
    asyncio.run(main())

我尝试将 **print(await node.read_browse_name()) ** 更改为 datachange_notification 函数,但结果相同

python python-asyncio python-multithreading opc-ua
© www.soinside.com 2019 - 2024. All rights reserved.