python-asyncio 相关问题

此标记用于asyncio Python包,它提供了编写单线程并发代码的机制。 asyncio包提供从Python 3.4开始的异步I / O,事件循环,协同程序和任务。

带有 pytest 的异步装置

如何定义异步装置并在异步测试中使用它们? 以下代码全部位于同一个文件中,严重失败。该装置是否由测试运行者明确调用并且不等待? @pytest.fi...

回答 4 投票 0

如何接受成员对象作为可选参数?

@bot.commands() 异步 def 示例(ctx,成员:discord.Member = ctx.message.author): 这基本上就是我想要的,如果没有使用参数,它将默认为消息作者。看来我...

回答 1 投票 0

如何在多线程中运行aioschedule

导入异步 导入时间 导入aioschedule 从日期时间导入日期时间 异步 def work_1(): print('工作1', datetime.now()) 等待 asyncio.sleep(30) 异步 def work_2(): 优先...

回答 1 投票 0

让 scrapy 和 pytest 与 AsyncioSelectorReactor 一起使用

重现我的问题 蟒蛇3.12.1 爬虫2.11.2 pytest 8.2.1 在 bookspider.py 中我有: 从输入 import Iterable 导入scrapy 从 scrapy.http 导入请求 类 BookSpider(scrapy.Spider)...

回答 1 投票 0

如何将消息作者设置为默认成员参数?不和谐.py

@bot.commands() 异步 def 示例(ctx,成员:discord.Member = ctx.message.author): 这基本上就是我想要的,如果没有使用参数,它将默认为消息作者。看来我...

回答 1 投票 0

如何模拟修补类的异步实例方法?

(以下代码可以在Jupyter中运行。) 我有一个B类,它使用A类,需要测试。 A类: 异步 def f(self): 经过 B类: 异步 def f(self): a = A() ...

回答 3 投票 0

如何从Python中的异步函数获取打印响应

我想获得同步功能的打印响应。因为我正在使用, 字符串IO。 导入系统 从 io 导入 StringIO def 样本(): print('开始获取日志!') 打印('响应:正文') 打印('st...

回答 1 投票 0

将 TaskGroup 与 Motor Aggregate 一起使用时出现空迭代器

使用任务组迭代聚合结果时,我收到 StopIteration 错误,但其他情况下则不会。我创建了一个最小的示例来显示错误: 导入异步 来自电机。

回答 1 投票 0

如何在再次按下按钮时取消异步协程?

再次单击按钮时如何取消已在运行的任务? 我已经草拟了代码,但它肯定不起作用 类任务管理器: def __init__(self) -> 无: ...

回答 1 投票 0

验证证书失败后从socket对象获取ssl客户端证书

我有一个用 python 编写的套接字服务器,接受如下连接: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 将套接字设置为非阻塞 self.socket.setblocking(False)...

回答 1 投票 0

Pyrogram 中出现错误“RuntimeError: Timeout context manager should be use inside a task”

我在main.py文件中有2个线程,第一个是通过Thread(bot线程)创建的,第二个是主要的python线程,Pyrogram在其中运行main.py代码的屏幕截图 但是当我运行代码时,

回答 1 投票 0

为什么在asyncio.run()内部调用async函数?

Python 文档给出了以下代码示例: 导入异步 异步 def main(): 打印('你好...') 等待 asyncio.sleep(1) print('...世界!') asyncio.run(主()) 为什么 main 在

回答 1 投票 0

为什么在asyncio.run()内部调用async函数?

Python 文档给出了以下代码示例: 导入异步 异步 def main(): 打印('你好...') 等待 asyncio.sleep(1) print('...世界!') asyncio.run(主()) 为什么 main 在

回答 1 投票 0

如何正确处理asyncio.TimeoutError?从未检索到任务异常

我正在将 asyncio 和 aiohttp 用于需要发送许多并发 HTTP 头请求并收集结果的脚本。当其中一个头请求发生异常时,它会立即发生

回答 1 投票 0

在Python异步代码中跟踪单独的流程

我有一些 python 异步代码,可以发出日志行来记录其进度。但是,同一代码的多个实例可能同时运行,因此日志行也会混合在一起。我会...

回答 1 投票 0

Python 单元测试协程模拟列表

我尝试编写异步代码,它工作正常(Python 3.12),但我在测试方面遇到了困难。 这是我想测试的功能: 异步 def get_repositories( gl_client: gitlab.client.Gitl...

回答 2 投票 0

在discord.py中停止机器人命令

所以我写了一个机器人,它会像这样发送垃圾邮件: @bot.command() 异步定义开始(ctx): 而真实: wait ctx.send("请使用正确的渠道进行讨论") 等待异步。

回答 2 投票 0

如何在Python中订阅NATS主题并持续接收消息?

我尝试了下面的示例(来自此页面): nc = NATS() 等待 nc.connect(服务器=["nats://demo.nats.io:4222"]) 未来 = asyncio.Future() 异步 def cb(msg): 非本地未来 未来。

回答 3 投票 0

为什么我的websocket协程在下面的代码中没有被调用?

我正在使用 alpaca-py(Alpaca 的新 python 包)来创建一个基本的 tradebot。我的目标是让机器人进行交易(购买),从 Alpaca 的 webhook 获取有关订单是否正确的数据...

回答 2 投票 0

ContextVar 内存泄漏

下面的代码存在内存泄漏,我不明白为什么会有对MyObj的引用。 run(1) 和 run(2) 完成,上下文被清除。 导入异步 导入气相色谱 从 contextvars 导入

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.