同时保存到文件 opcua 数据更改通知

问题描述 投票:0回答:1

我订阅了多个节点,这样只有当值发生变化时我才会收到通知。 我必须将数据保存到 csv 文件。这个想法是用时间戳+节点的所有值写入行。 但是,如果我同时收到多个通知,则使用我的代码,文件会随着第一个通知而更新,但不会更新丢失的其他通知。 例如,如果我收到通知:

23112023 17:22:26       OPCUA Data change event ns=2;s=HUMI.HUMI_v 33.30080032348633
23112023 17:22:26       OPCUA Data change event ns=2;s=TEMP.TEMP_v 19.26759910583496

仅湿度值会更新,而温度值不会改变。

我尝试使用 asyncio.Lock() 来同步对 CSV 文件的访问,但没有任何改变。我还尝试使用 pandas 而不是 csv 模块。

这是我的 pandas 通知代码:

class SubscriptionHandler(object):
    def __init__(self, wsconn):
        self.wsconn = wsconn
        self.csv_lock = asyncio.Lock()  # initialize the loc

    async def datachange_notification(self, node, val, data):
        dte = data.monitored_item.Value.ServerTimestamp.strftime("%d%m%Y %H:%M:%S")
        print(dte + "\tOPCUA Data change event", node, val)
        val_name = str(node.nodeid.Identifier)

        csv_file = test.csv

        async with self.csv_lock:
            df = pd.read_csv(csv_file)
            last_row = df.iloc[-1].copy()
            if last_row['Timestamp'] == dte:
                last_row[val_name] = val
            else:
                new_row = last_row.copy()
                new_row['Timestamp'] = dte
                new_row[val_name] = val
                df = pd.concat([df, new_row.to_frame().T], ignore_index=True)

            df.to_csv(csv_file, index=False)

有人可以指出如何解决这个问题吗?

python asynchronous subscription opc-ua
1个回答
0
投票

您添加单独的异步任务,它有一个队列来仅记录数据。

async def csv_writer_task(queue):
    csv_file = test.csv
    while True:
        dte, val, val_name = await queue.get()
        # check for stop signal
        if item is None:
            break
        df = pd.read_csv(csv_file)
        last_row = df.iloc[-1].copy()
        if last_row['Timestamp'] == dte:
           last_row[val_name] = val
        else:
           new_row = last_row.copy()
           new_row['Timestamp'] = dte
           new_row[val_name] = val
           df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
        df.to_csv(csv_file, index=False)

class SubscriptionHandler(object):
    def __init__(self, wsconn):
        self.wsconn = wsconn
        self.q = asyncio.Queue()
        self.queuwriter = asyncio.create_task(csv_writer_task(self.q))
   
   async def datachange_notification(self, node, val, data):
        dte = data.monitored_item.Value.ServerTimestamp.strftime("%d%m%Y %H:%M:%S")
        print(dte + "\tOPCUA Data change event", node, val)
        val_name = str(node.nodeid.Identifier)
        self.queue.put_nowait((dte, val, val_name))
© www.soinside.com 2019 - 2024. All rights reserved.