Python 3.10 asyncio 服务器任务在从阅读器读取时被破坏

问题描述 投票:0回答:0

我正在尝试在 asyncio 中编写一个对等节点。两个相关的功能是让节点回答查询并获得关于它如何回答所述查询的反馈。为了回答查询,节点可以将查询转发到另一个节点,这也需要将任何收到的反馈转发到同一节点。我的问题是,当节点 A 转发到节点 B 并且节点 B 转发回 A 时,回答查询的任务被取消,给出“任务已被销毁但它正在等待!”。

只有当节点将查询转发回它从中获取查询的节点时才会发生此错误。

启动节点的代码是:

async def run_server(host='127.0.0.1', port=0, known=None):
    global OWN_HOST, OWN_PORT
    server = await asyncio.start_server(handle_request, host, port)
    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    OWN_HOST, OWN_PORT = server.sockets[0].getsockname()
    print(f'Serving on {addrs}')

    if known:
        await neighbors_add_all(known)

    async with server:
        await asyncio.gather(server.serve_forever(), manage_neighbors())

handle request 函数是(删掉不相关的东西):

async def handle_request(reader, writer):
    request_type = int((await reader.read(1))[0])
    task = asyncio.current_task()
    task.set_name(task.get_name() + '_' + str(request_type))

    addr = writer.get_extra_info('peername')
    print(f'\nRequest from {addr}')
    if request_type == REQUEST_PING:
        ...
        [Unrelated stuff]
        ...
    elif request_type == REQUEST_QUERY:
        await handle_query(reader, writer)
    elif request_type == REQUEST_FEEDBACK:
        await handle_feedback(reader, writer)
    else:
        print("ERROR UNRECOGNIZED REQUEST")
    writer.close()
    await writer.wait_closed()

我将只关注反馈部分,因为这是导致错误最多的部分,它们都以同样的方式失败,而且它已经是一堵文字墙了。下面是反馈中涉及的两个函数:

async def handle_feedback(reader, writer):
    task = asyncio.current_task()
    task.set_name(task.get_name() + '_started')
    print(f"Request to give feedback from {writer.get_extra_info('peername')}")

    # read addr
    addr_len = int.from_bytes(await reader.read(4), 'big')
    addr = await reader.read(addr_len)
    # read signature
    sig_len = int.from_bytes(await reader.read(4), 'big')
    sig = await reader.read(sig_len)
    # read qid 
    qid_len = int.from_bytes(await reader.read(4), 'big')
    qid = await reader.read(qid_len)
    # read reward
    feedback_len = int.from_bytes(await reader.read(4), 'big')
    feedback = await reader.read(feedback_len)

    task.set_name(task.get_name() + '_gotbytes')
    sig_valid, message = await wallet_verify_signature(addr, sig, qid + feedback)
    async with _queries_lock:
        qid_exists = qid in _queries
    if not sig_valid:
        message = message.encode()
        message_length = len(message).to_bytes(4, 'big')
        writer.write(b'\x01' + message_length + message)
        await writer.drain()
        return
    elif not qid_exists:
        message = "qid not recognized".encode()
        message_len = len(message).to_bytes(4, 'big')
        writer.write(b'\x02' + message_length + message)
        await writer.drain()
        return

    ## Processing feedback ##
    async with _queries_lock:
        caller_addr, query_vec, content, neighbor, new_query_vec, other_qid = _queries[qid]
    if caller_addr != addr:
        message = "Address does not match address that made query".encode()
        message_len = len(message).to_bytes(4, 'big')
        writer.write(b'\x03' + message_len + message)
        await writer.drain()
        return
    async with _queries_lock:
        del _queries[qid]

    # Ensure feedback not above balance and remove from balance
    feedback = int.from_bytes(feedback, 'big')
    async with _funding_lock:
        if _balances[addr] == 0:
            feedback = 1
            _balances[addr] = 1
        elif feedback > _balances[addr]:
            feedback = _balances[addr]
            # Refund query cost for feedback
            _balances[addr] = 1
        elif feedback > 1:
            _balances[addr] = _balances[addr] - feedback + 1
            # Refund query cost for feedback
            _balances[addr] += 1
        elif feedback == 1:
            # Refund query cost for feedback
            _balances[addr] += 1

    #adjust vector of query
    await query_update_model(feedback, query_vec, content, neighbor, new_query_vec)

    task.set_name(task.get_name() + '_b4forward')
    # Forward feedback
    if other_qid is not None:
        new_feedback = (max(feedback - 1, 1) if feedback > 0 else 0).to_bytes(feedback_len, 'big')
#
#
#
        ####### THIS IS WHERE THE QUERY GETS FORWARDED ##########
        await request_send_feedback(*neighbor, other_qid, new_feedback)


    message = "ack".encode()
    message_len = len(message)
    writer.write(b'\x00' + message_len.to_bytes(4, 'big') + message)
    await writer.drain()
    task.set_name(task.get_name() + '_finished')


async def request_send_feedback(host, port, qid, feedback):
    task = asyncio.current_task()
    if type(feedback) is not bytes:
        feedback = feedback.to_bytes(8, 'big')
    reader, writer = await asyncio.open_connection(host, port)
    peername = writer.get_extra_info("peername")

    
    sig = await wallet_sign_bytes(qid + feedback) 
    addr_len = len(OWN_ADDR)
    sig_len = len(sig)
    qid_len = len(qid)
    feedback_len = len(feedback)
    message = REQUEST_FEEDBACK.to_bytes(1, 'big') + \
                addr_len.to_bytes(4, 'big') + OWN_ADDR + \
                sig_len.to_bytes(4, 'big') + sig + \
                qid_len.to_bytes(4, 'big') + qid + \
                feedback_len.to_bytes(4, 'big') + feedback 
    print(f'Sending feedback to {peername} from {writer.get_extra_info("sockname")}')
    writer.write(message)
    await writer.drain()
    print(f'Feedback sent')
    task.set_name(task.get_name() + '_here')
#
#
#
    ######### THIS IS THE LINE THAT FAILS ##############
    resp_code = await reader.read(1)
    task.set_name(task.get_name() + '_here2')
    resp_code = int.from_bytes(resp_code, 'big')
    task.set_name(task.get_name() + '_here3')
    print(f'Response code: {resp_code}')
    message_len = int.from_bytes(await reader.read(4), 'big')
    print(f'Message len: {message_len}')
    message = (await reader.read(message_len)).decode()
    if resp_code == 0:
        print(f"Response from {peername} to feedback: {message}")
    else:
        print(f'Error giving feedback to {peername}: {message}')
    writer.close()
    await writer.wait_closed()

任务名称的更改是我用来查找错误发生位置的。被取消调用错误的任务的名称总是 任务-XXX_8_started_gotbytes_b4forward_here 用数字代替 XXX.

为什么尝试从阅读器读取时任务被取消? 有没有办法允许相同的递归行为而不会导致错误?

我能找到的关于“Pending Task Was Destroyed”错误的所有内容都谈到了如何正确创建任务,以便垃圾收集器不会删除它们,但我没有手动创建任务。我必须做些什么才能将任务保存在使用 asyncio 的 start_server 制作的服务器上吗?

感谢您的宝贵时间,并对文字墙表示歉意。

python server python-asyncio p2p
© www.soinside.com 2019 - 2024. All rights reserved.