我订阅了多个节点,这样只有当值发生变化时我才会收到通知。 我必须将数据保存到 csv 文件。这个想法是用时间戳+节点的所有值写入行。 但是,如果我同时收到多个通知,则使用我的代码,文件会随着第一个通知而更新,但不会更新丢失的其他通知。 例如,如果我收到通知:
23112023 17:22:26 OPCUA Data change event ns=2;s=HUMI.HUMI_v 33.30080032348633
23112023 17:22:26 OPCUA Data change event ns=2;s=TEMP.TEMP_v 19.26759910583496
仅湿度值会更新,而温度值不会改变。
我尝试使用 asyncio.Lock() 来同步对 CSV 文件的访问,但没有任何改变。我还尝试使用 pandas 而不是 csv 模块。
这是我的 pandas 通知代码:
class SubscriptionHandler(object):
def __init__(self, wsconn):
self.wsconn = wsconn
self.csv_lock = asyncio.Lock() # initialize the loc
async def datachange_notification(self, node, val, data):
dte = data.monitored_item.Value.ServerTimestamp.strftime("%d%m%Y %H:%M:%S")
print(dte + "\tOPCUA Data change event", node, val)
val_name = str(node.nodeid.Identifier)
csv_file = test.csv
async with self.csv_lock:
df = pd.read_csv(csv_file)
last_row = df.iloc[-1].copy()
if last_row['Timestamp'] == dte:
last_row[val_name] = val
else:
new_row = last_row.copy()
new_row['Timestamp'] = dte
new_row[val_name] = val
df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
df.to_csv(csv_file, index=False)
有人可以指出如何解决这个问题吗?
您添加单独的异步任务,它有一个队列来仅记录数据。
async def csv_writer_task(queue):
csv_file = test.csv
while True:
dte, val, val_name = await queue.get()
# check for stop signal
if item is None:
break
df = pd.read_csv(csv_file)
last_row = df.iloc[-1].copy()
if last_row['Timestamp'] == dte:
last_row[val_name] = val
else:
new_row = last_row.copy()
new_row['Timestamp'] = dte
new_row[val_name] = val
df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
df.to_csv(csv_file, index=False)
class SubscriptionHandler(object):
def __init__(self, wsconn):
self.wsconn = wsconn
self.q = asyncio.Queue()
self.queuwriter = asyncio.create_task(csv_writer_task(self.q))
async def datachange_notification(self, node, val, data):
dte = data.monitored_item.Value.ServerTimestamp.strftime("%d%m%Y %H:%M:%S")
print(dte + "\tOPCUA Data change event", node, val)
val_name = str(node.nodeid.Identifier)
self.queue.put_nowait((dte, val, val_name))