Python Asyncio错误:“ OSError:[WinError 6]句柄无效”和“ RuntimeError:事件循环已关闭”]]

问题描述 投票:0回答:1

更新

解决此问题的方法是在获取行asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())之前添加loop = asyncio.get_event_loop()

我正在查看代码,还看到另一行对我有帮助。 试试这个代替 我希望这将asyncio.set_event_loop(asyncio.ProactorEventLoop()) 帮助您– Gerrit Geeraerts

我在正确使用我的代码时遇到了一些困难,因为在VSCode上调试时我的代码完成执行后出现以下错误:

Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000188AB3259D0>
Traceback (most recent call last):
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 116, in __del__
    self.close()
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 719, in call_soon
    self._check_closed()
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

如果我使用命令行运行代码,也会收到以下错误:

Cancelling an overlapped future failed
future: <_OverlappedFuture pending overlapped=<pending, 0x25822633550> cb=[_ProactorReadPipeTransport._loop_reading()]>
Traceback (most recent call last):
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\windows_events.py", line 66, in _cancel_overlapped
    self._ov.cancel()
OSError: [WinError 6] The handle is invalid

以下代码是Reddit的异步API包装器。由于Reddit要求漫游器限制为每分钟60个请求,因此我决定在单独的线程中实现一个处理循环的队列,以处理请求。

要运行它,您需要create a Reddit app并使用您的登录凭据以及机器人ID和密码。

如果有帮助,我正在Windows 10上使用Python 3.8.3 64位。

import requests
import aiohttp
import asyncio
import threading
from types import SimpleNamespace
from time import time

oauth_url = 'https://oauth.reddit.com/'
base_url = 'https://www.reddit.com/'

agent = 'windows:reddit-async:v0.1 (by /u/UrHyper)'


class Reddit:
    def __init__(self, username: str, password: str, app_id: str, app_secret: str):
        data = {'grant_type': 'password',
                'username': username, 'password': password}
        auth = requests.auth.HTTPBasicAuth(app_id, app_secret)
        response = requests.post(base_url + 'api/v1/access_token',
                                 data=data,
                                 headers={'user-agent': agent},
                                 auth=auth)
        self.auth = response.json()

        if 'error' in self.auth:
            msg = f'Failed to authenticate: {self.auth["error"]}'
            if 'message' in self.auth:
                msg += ' - ' + self.auth['message']
            raise ValueError(msg)

        token = 'bearer ' + self.auth['access_token']
        self.headers = {'Authorization': token, 'User-Agent': agent}

        self._rate = 1
        self._last_loop_time = 0.0
        self._loop = asyncio.new_event_loop()
        self._queue = asyncio.Queue(0)
        self._end_loop = False
        self._loop_thread = threading.Thread(target=self._start_loop_thread)
        self._loop_thread.start()

    def stop(self):
        self._end_loop = True

    def __del__(self):
        self.stop()

    def _start_loop_thread(self):
        asyncio.set_event_loop(self._loop)
        self._loop.run_until_complete(self._process_queue())

    async def _process_queue(self):
        while True:
            if self._end_loop and self._queue.empty():
                await self._queue.join()
                break
            start_time = time()
            if self._last_loop_time < self._rate:
                await asyncio.sleep(self._rate - self._last_loop_time)
            try:
                queue_item = self._queue.get_nowait()
                url = queue_item['url']
                callback = queue_item['callback']
                data = await self._get_data(url)
                self._queue.task_done()
                callback(data)
            except asyncio.QueueEmpty:
                pass
            finally:
                self._last_loop_time = time() - start_time

    async def _get_data(self, url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=self.headers) as response:
                assert response.status == 200
                data = await response.json()
                data = SimpleNamespace(**data)
                return data

    async def get_bot(self, callback: callable):
        url = oauth_url + 'api/v1/me'
        await self._queue.put({'url': url, 'callback': callback})

    async def get_user(self, user: str, callback: callable):
        url = oauth_url + 'user/' + user + '/about'
        await self._queue.put({'url': url, 'callback': callback})


def callback(data): print(data['name'])


async def main():
    reddit = Reddit('', '', '', '')

    await reddit.get_bot(lambda bot: print(bot.name))
    reddit.stop()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    

更新此问题的解决方案是在获取线路循环= asyncio.get_event_loop()之前添加asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy():我正在寻找...

python python-3.x visual-studio-code python-asyncio python-multithreading
1个回答
0
投票
我在异步方面遇到了类似的问题。
© www.soinside.com 2019 - 2024. All rights reserved.