我正在尝试在 asyncio 中编写一个对等节点。两个相关的功能是让节点回答查询并获得关于它如何回答所述查询的反馈。为了回答查询,节点可以将查询转发到另一个节点,这也需要将任何收到的反馈转发到同一节点。我的问题是,当节点 A 转发到节点 B 并且节点 B 转发回 A 时,回答查询的任务被取消,给出“任务已被销毁但它正在等待!”。
只有当节点将查询转发回它从中获取查询的节点时才会发生此错误。
启动节点的代码是:
async def run_server(host='127.0.0.1', port=0, known=None):
global OWN_HOST, OWN_PORT
server = await asyncio.start_server(handle_request, host, port)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
OWN_HOST, OWN_PORT = server.sockets[0].getsockname()
print(f'Serving on {addrs}')
if known:
await neighbors_add_all(known)
async with server:
await asyncio.gather(server.serve_forever(), manage_neighbors())
handle request 函数是(删掉不相关的东西):
async def handle_request(reader, writer):
request_type = int((await reader.read(1))[0])
task = asyncio.current_task()
task.set_name(task.get_name() + '_' + str(request_type))
addr = writer.get_extra_info('peername')
print(f'\nRequest from {addr}')
if request_type == REQUEST_PING:
...
[Unrelated stuff]
...
elif request_type == REQUEST_QUERY:
await handle_query(reader, writer)
elif request_type == REQUEST_FEEDBACK:
await handle_feedback(reader, writer)
else:
print("ERROR UNRECOGNIZED REQUEST")
writer.close()
await writer.wait_closed()
我将只关注反馈部分,因为这是导致错误最多的部分,它们都以同样的方式失败,而且它已经是一堵文字墙了。下面是反馈中涉及的两个函数:
async def handle_feedback(reader, writer):
task = asyncio.current_task()
task.set_name(task.get_name() + '_started')
print(f"Request to give feedback from {writer.get_extra_info('peername')}")
# read addr
addr_len = int.from_bytes(await reader.read(4), 'big')
addr = await reader.read(addr_len)
# read signature
sig_len = int.from_bytes(await reader.read(4), 'big')
sig = await reader.read(sig_len)
# read qid
qid_len = int.from_bytes(await reader.read(4), 'big')
qid = await reader.read(qid_len)
# read reward
feedback_len = int.from_bytes(await reader.read(4), 'big')
feedback = await reader.read(feedback_len)
task.set_name(task.get_name() + '_gotbytes')
sig_valid, message = await wallet_verify_signature(addr, sig, qid + feedback)
async with _queries_lock:
qid_exists = qid in _queries
if not sig_valid:
message = message.encode()
message_length = len(message).to_bytes(4, 'big')
writer.write(b'\x01' + message_length + message)
await writer.drain()
return
elif not qid_exists:
message = "qid not recognized".encode()
message_len = len(message).to_bytes(4, 'big')
writer.write(b'\x02' + message_length + message)
await writer.drain()
return
## Processing feedback ##
async with _queries_lock:
caller_addr, query_vec, content, neighbor, new_query_vec, other_qid = _queries[qid]
if caller_addr != addr:
message = "Address does not match address that made query".encode()
message_len = len(message).to_bytes(4, 'big')
writer.write(b'\x03' + message_len + message)
await writer.drain()
return
async with _queries_lock:
del _queries[qid]
# Ensure feedback not above balance and remove from balance
feedback = int.from_bytes(feedback, 'big')
async with _funding_lock:
if _balances[addr] == 0:
feedback = 1
_balances[addr] = 1
elif feedback > _balances[addr]:
feedback = _balances[addr]
# Refund query cost for feedback
_balances[addr] = 1
elif feedback > 1:
_balances[addr] = _balances[addr] - feedback + 1
# Refund query cost for feedback
_balances[addr] += 1
elif feedback == 1:
# Refund query cost for feedback
_balances[addr] += 1
#adjust vector of query
await query_update_model(feedback, query_vec, content, neighbor, new_query_vec)
task.set_name(task.get_name() + '_b4forward')
# Forward feedback
if other_qid is not None:
new_feedback = (max(feedback - 1, 1) if feedback > 0 else 0).to_bytes(feedback_len, 'big')
#
#
#
####### THIS IS WHERE THE QUERY GETS FORWARDED ##########
await request_send_feedback(*neighbor, other_qid, new_feedback)
message = "ack".encode()
message_len = len(message)
writer.write(b'\x00' + message_len.to_bytes(4, 'big') + message)
await writer.drain()
task.set_name(task.get_name() + '_finished')
async def request_send_feedback(host, port, qid, feedback):
task = asyncio.current_task()
if type(feedback) is not bytes:
feedback = feedback.to_bytes(8, 'big')
reader, writer = await asyncio.open_connection(host, port)
peername = writer.get_extra_info("peername")
sig = await wallet_sign_bytes(qid + feedback)
addr_len = len(OWN_ADDR)
sig_len = len(sig)
qid_len = len(qid)
feedback_len = len(feedback)
message = REQUEST_FEEDBACK.to_bytes(1, 'big') + \
addr_len.to_bytes(4, 'big') + OWN_ADDR + \
sig_len.to_bytes(4, 'big') + sig + \
qid_len.to_bytes(4, 'big') + qid + \
feedback_len.to_bytes(4, 'big') + feedback
print(f'Sending feedback to {peername} from {writer.get_extra_info("sockname")}')
writer.write(message)
await writer.drain()
print(f'Feedback sent')
task.set_name(task.get_name() + '_here')
#
#
#
######### THIS IS THE LINE THAT FAILS ##############
resp_code = await reader.read(1)
task.set_name(task.get_name() + '_here2')
resp_code = int.from_bytes(resp_code, 'big')
task.set_name(task.get_name() + '_here3')
print(f'Response code: {resp_code}')
message_len = int.from_bytes(await reader.read(4), 'big')
print(f'Message len: {message_len}')
message = (await reader.read(message_len)).decode()
if resp_code == 0:
print(f"Response from {peername} to feedback: {message}")
else:
print(f'Error giving feedback to {peername}: {message}')
writer.close()
await writer.wait_closed()
任务名称的更改是我用来查找错误发生位置的。被取消调用错误的任务的名称总是 任务-XXX_8_started_gotbytes_b4forward_here 用数字代替 XXX.
为什么尝试从阅读器读取时任务被取消? 有没有办法允许相同的递归行为而不会导致错误?
我能找到的关于“Pending Task Was Destroyed”错误的所有内容都谈到了如何正确创建任务,以便垃圾收集器不会删除它们,但我没有手动创建任务。我必须做些什么才能将任务保存在使用 asyncio 的 start_server 制作的服务器上吗?
感谢您的宝贵时间,并对文字墙表示歉意。