同时保存到csv opcua数据更改通知

问题描述 投票:0回答:1

我同时订阅了多个节点,这样我只有在值发生变化时才会收到通知。我需要将数据存储在 csv 文件中,并使用时间戳 + 节点的所有值写入行。我实现了 asyncio.Queue 来处理同时写入,但如果我同时收到多个通知,文件会随着第一个通知而更新,但不会更新其他丢失的通知。

这是输出:

sv_writer_task started
Queue list: 
<Queue maxsize=0>

Datachange notification received.
29032024 14:10:16Data change notification was received and queued:  ns=2;s=Unit_SB.UC3.TEMP.TEMP_v 19.731399536132812
Datachange notification received.
29032024 14:10:16Data change notification was received and queued:  ns=2;s=Unit_SB.UC3.HUMI.HUMI_v 36.523399353027344

Dequeue value:  19.731399536132812
val name:  TEMP
Prefix UC1 is no match for node: Unit_SB.UC3.TEMP.TEMP_v. Skipping.
Prefix UC2 is no match for node: Unit_SB.UC3.TEMP.TEMP_v. Skipping.
csv file:  UC3.csv
Appending new row
Data written to file UC3.csv succesfully

Dequeue value:  36.523399353027344
val name:  HUMI
Prefix UC1 is no match for node: Unit_SB.UC3.HUMI.HUMI_v. Skipping.
Prefix UC2 is no match for node: Unit_SB.UC3.HUMI.HUMI_v. Skipping.
csv file:  UC3.csv
Updating existing row
Data written to file UC3.csv succesfully

在 csv 文件中,我们可以在最后一行看到,只有先到达的 TEMP 值发生了更改,但 HUMI 值没有更改:

Timestamp,OUT_G,OUT_U,HUMI,TEMP
29032024 14:08:42,True,True,47.38769912719727,15.043899536132812
29032024 14:10:16,True,True,47.38769912719727,19.731399536132812

这是到目前为止的代码。由于我订阅了多个子系统,并且每个子系统都有自己的数据文件,因此我正在检查要写入哪个文件:

# this is the consumer which consumes items from the queue
async def csv_writer_task(queue):
    print("csv_writer_task started")
    print('Queue list: ')
    print(queue)
    print('')
    while True:
        try:
            dte, node, val = await queue.get()
            print('Dequeue value: ', val)
        except Exception as e:
            print(f"Error in csv_writer_task while retrieving value fromt he queue: {e}")

        node_id_str = str(node.nodeid.Identifier)
        node_parts = node_id_str[len("Unit_SB."):].split('.')
        val_name = node_parts[-1].replace('_v', '')
        print('val name: ', val_name)
        for key, header_row in prefix_mapping.items():
            if f"Unit_SB.{key}" in node_id_str:
                csv_file = f"{key}.csv"
                print('csv file: ', csv_file)
                break
            else:
                print(f"Prefix {key} is no match for node: {node_id_str}. Skipping.")
        df = pd.read_csv(csv_file)
        last_row = df.iloc[-1].copy()
        if last_row['Timestamp'] == dte:
            print("Updating existing row")
            last_row[val_name] = val
        else:
            print("Appending new row")
            new_row = last_row.copy()
            new_row['Timestamp'] = dte
            new_row[val_name] = val
            df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
        df.to_csv(csv_file, index=False)
        print(f'Data written to file {csv_file} succesfully')
        queue.task_done()


class SubscriptionHandler(object):
    def __init__(self, wsconn):
        self.wsconn = wsconn
        self.q = asyncio.Queue()
        self.queuwriter = asyncio.create_task(csv_writer_task(self.q))

    # this is the producer which produces items and puts them into a queue
    async def datachange_notification(self, node, val, data):
        print("Datachange notification received.")
        dte = data.monitored_item.Value.ServerTimestamp.strftime("%d%m%Y %H:%M:%S")
        await self.q.put((dte, node, val))
        print(dte + "Data change notification was received and queued: ", node, val)


class SBConnection():
    def __init__(self):
        self.listOfWSNode = []
        self.dpsList = ...

    async def connectAndSubscribeToServer(self):
        self.csv_file = ''

        async with Client(url=self.url) as self.client:
            for element in self.dpsList:
                node = "ns=" + element["NS"] + ";s=" + element["Name"]
                var = self.client.get_node(node)
                self.listOfWSNode.append(var)
            print("Subscribe to ", self.listOfWSNode)

            handler = SubscriptionHandler(self)
            sub = await self.client.create_subscription(period=10, handler=handler)

            await sub.subscribe_data_change(self.listOfWSNode)
            print('subscription created')

            # create infinite loop
            while True:
                await asyncio.sleep(0.1)


async def main():
    uc = SBConnection()
    await uc.connectAndSubscribeToServer()

if __name__ == '__main__':
    asyncio.run(main())

谁能帮我找出问题所在吗? 谢谢

评论后更正代码:

async def csv_writer_task(queue):
    print("csv_writer_task started")
    print('Queue list: ')
    print(queue)
    print('')
    while True:
        try:
            dte, node, val = await queue.get()
            print('Dequeue value: ', val)
        except Exception as e:
            print(f"Error in csv_writer_task while retrieving value fromt he queue: {e}")

        node_id_str = str(node.nodeid.Identifier)
        node_parts = node_id_str[len("Unit_SB."):].split('.')
        val_name = node_parts[-1].replace('_v', '')
        print('val name: ', val_name)
        for key, header_row in prefix_mapping.items():
            if f"Unit_SB.{key}" in node_id_str:
                csv_file = f"{key}.csv"
                print('csv file: ', csv_file)
                break
            else:
                print(f"Prefix {key} is no match for node: {node_id_str}. Skipping.")
        df = pd.read_csv(csv_file)
        if df.iloc[-1]['Timestamp'] == dte:
            print("Updating existing row")
            df.loc[df.index[-1], val_name] = val  # Modify the last row in the dataframe directly
        else:
            print("Appending new row")
            new_row = df.iloc[-1].copy()
            new_row['Timestamp'] = dte
            new_row[val_name] = val
            df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
        
        df.to_csv(csv_file, index=False)
        print(f'Data written to file {csv_file} succesfully')
        queue.task_done()
python asynchronous python-asyncio subscription opc-ua
1个回答
0
投票

如评论中所述,解决方案是我正在修改最后一行的副本,但随后我保存了原始行。这样做我就失去了改变。

© www.soinside.com 2019 - 2024. All rights reserved.