我同时订阅了多个节点,这样我只有在值发生变化时才会收到通知。我需要将数据存储在 csv 文件中,并使用时间戳 + 节点的所有值写入行。我实现了 asyncio.Queue 来处理同时写入,但如果我同时收到多个通知,文件会随着第一个通知而更新,但不会更新其他丢失的通知。
这是输出:
sv_writer_task started
Queue list:
<Queue maxsize=0>
Datachange notification received.
29032024 14:10:16Data change notification was received and queued: ns=2;s=Unit_SB.UC3.TEMP.TEMP_v 19.731399536132812
Datachange notification received.
29032024 14:10:16Data change notification was received and queued: ns=2;s=Unit_SB.UC3.HUMI.HUMI_v 36.523399353027344
Dequeue value: 19.731399536132812
val name: TEMP
Prefix UC1 is no match for node: Unit_SB.UC3.TEMP.TEMP_v. Skipping.
Prefix UC2 is no match for node: Unit_SB.UC3.TEMP.TEMP_v. Skipping.
csv file: UC3.csv
Appending new row
Data written to file UC3.csv succesfully
Dequeue value: 36.523399353027344
val name: HUMI
Prefix UC1 is no match for node: Unit_SB.UC3.HUMI.HUMI_v. Skipping.
Prefix UC2 is no match for node: Unit_SB.UC3.HUMI.HUMI_v. Skipping.
csv file: UC3.csv
Updating existing row
Data written to file UC3.csv succesfully
在 csv 文件中,我们可以在最后一行看到,只有先到达的 TEMP 值发生了更改,但 HUMI 值没有更改:
Timestamp,OUT_G,OUT_U,HUMI,TEMP
29032024 14:08:42,True,True,47.38769912719727,15.043899536132812
29032024 14:10:16,True,True,47.38769912719727,19.731399536132812
这是到目前为止的代码。由于我订阅了多个子系统,并且每个子系统都有自己的数据文件,因此我正在检查要写入哪个文件:
# this is the consumer which consumes items from the queue
async def csv_writer_task(queue):
print("csv_writer_task started")
print('Queue list: ')
print(queue)
print('')
while True:
try:
dte, node, val = await queue.get()
print('Dequeue value: ', val)
except Exception as e:
print(f"Error in csv_writer_task while retrieving value fromt he queue: {e}")
node_id_str = str(node.nodeid.Identifier)
node_parts = node_id_str[len("Unit_SB."):].split('.')
val_name = node_parts[-1].replace('_v', '')
print('val name: ', val_name)
for key, header_row in prefix_mapping.items():
if f"Unit_SB.{key}" in node_id_str:
csv_file = f"{key}.csv"
print('csv file: ', csv_file)
break
else:
print(f"Prefix {key} is no match for node: {node_id_str}. Skipping.")
df = pd.read_csv(csv_file)
last_row = df.iloc[-1].copy()
if last_row['Timestamp'] == dte:
print("Updating existing row")
last_row[val_name] = val
else:
print("Appending new row")
new_row = last_row.copy()
new_row['Timestamp'] = dte
new_row[val_name] = val
df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
df.to_csv(csv_file, index=False)
print(f'Data written to file {csv_file} succesfully')
queue.task_done()
class SubscriptionHandler(object):
def __init__(self, wsconn):
self.wsconn = wsconn
self.q = asyncio.Queue()
self.queuwriter = asyncio.create_task(csv_writer_task(self.q))
# this is the producer which produces items and puts them into a queue
async def datachange_notification(self, node, val, data):
print("Datachange notification received.")
dte = data.monitored_item.Value.ServerTimestamp.strftime("%d%m%Y %H:%M:%S")
await self.q.put((dte, node, val))
print(dte + "Data change notification was received and queued: ", node, val)
class SBConnection():
def __init__(self):
self.listOfWSNode = []
self.dpsList = ...
async def connectAndSubscribeToServer(self):
self.csv_file = ''
async with Client(url=self.url) as self.client:
for element in self.dpsList:
node = "ns=" + element["NS"] + ";s=" + element["Name"]
var = self.client.get_node(node)
self.listOfWSNode.append(var)
print("Subscribe to ", self.listOfWSNode)
handler = SubscriptionHandler(self)
sub = await self.client.create_subscription(period=10, handler=handler)
await sub.subscribe_data_change(self.listOfWSNode)
print('subscription created')
# create infinite loop
while True:
await asyncio.sleep(0.1)
async def main():
uc = SBConnection()
await uc.connectAndSubscribeToServer()
if __name__ == '__main__':
asyncio.run(main())
谁能帮我找出问题所在吗? 谢谢
评论后更正代码:
async def csv_writer_task(queue):
print("csv_writer_task started")
print('Queue list: ')
print(queue)
print('')
while True:
try:
dte, node, val = await queue.get()
print('Dequeue value: ', val)
except Exception as e:
print(f"Error in csv_writer_task while retrieving value fromt he queue: {e}")
node_id_str = str(node.nodeid.Identifier)
node_parts = node_id_str[len("Unit_SB."):].split('.')
val_name = node_parts[-1].replace('_v', '')
print('val name: ', val_name)
for key, header_row in prefix_mapping.items():
if f"Unit_SB.{key}" in node_id_str:
csv_file = f"{key}.csv"
print('csv file: ', csv_file)
break
else:
print(f"Prefix {key} is no match for node: {node_id_str}. Skipping.")
df = pd.read_csv(csv_file)
if df.iloc[-1]['Timestamp'] == dte:
print("Updating existing row")
df.loc[df.index[-1], val_name] = val # Modify the last row in the dataframe directly
else:
print("Appending new row")
new_row = df.iloc[-1].copy()
new_row['Timestamp'] = dte
new_row[val_name] = val
df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
df.to_csv(csv_file, index=False)
print(f'Data written to file {csv_file} succesfully')
queue.task_done()
如评论中所述,解决方案是我正在修改最后一行的副本,但随后我保存了原始行。这样做我就失去了改变。