from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
import time
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("start")
time.sleep(10) # a process that's going to take some time
await update.message.reply_text("finish")
app = ApplicationBuilder().token("TOKEN HERE").build()
app.add_handler(CommandHandler("start", start))
app.run_polling()
这是我目前在项目中面临的问题的最简单的例子
机器人必须执行一个需要一些时间才能完成的过程
但是机器人在此过程中停止响应其他用户
我什么都试过了
我尝试了旧版本的 python telegram bot
我尝试使用线程(这不适用于异步函数)和asyncio(我对这种东西不太熟悉,但由于某些原因仍然没有响应其他用户)
我什至尝试在“start”函数中创建两个函数(一个异步,一个非异步),然后通过普通函数的线程运行异步函数。
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
import time
import threading
import asyncio
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
async def thread1(upd: Update, ctx: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text('start')
time.sleep(10)
await update.message.reply_text('finish')
def thread_the_thread(upd: Update, ctx: ContextTypes.DEFAULT_TYPE):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(thread1(upd, ctx))
loop.close()
t = threading.Thread(target=thread_the_thread, args=(update, context))
t.start()
app = ApplicationBuilder().token("TOKEN HERE").build()
app.add_handler(CommandHandler("start", start))
app.run_polling()
但是当我与两个不同的用户一起使用机器人时......
telegram.error.NetworkError: Unknown error in HTTP implementation: RuntimeError('<asyncio.locks.Event object at 0x0000022361E0A920 [unset]> is bound to a different event loop')
不知何故,有关 PTB (python-telegram-bot) 中异步工作的相关文档确实很难用谷歌搜索。提示:使用“concurrency”关键字。
官方文档中您问题的答案是here。
在跳转到 PTB 本身之前,值得一提的是您使用 blocking time.sleep() 函数。为了使其异步,您应该使用适当的函数:asyncio.sleep()。 你可以参考那个讨论。
对于PTB并发:有2种方法可以让机器人异步运行:
1。块=假
添加处理程序时使用“block=False”选项:
app.add_handler(CommandHandler("start", start, block=False))
对此要谨慎(引自 PTB 文档):
但是,通过在处理程序中使用 block=False,您不能再依赖于依次调用不同组中的处理程序。根据您的用例,这可能是一个问题。因此,PTB 提供了第二种选择。
2。并发更新
在构建应用程序实例期间激活并发更新选项:
app = ApplicationBuilder().token('TOKEN HERE').concurrent_updates(True).build()
这是我的一段代码,它按照您的预期运行(我使用 Loguru 进行日志记录,为了简单起见,您可以将其替换为 print() ):
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
import asyncio
import random
import sys
from loguru import logger
COUNTER = 0
TOKEN = "TOKEN HERE"
logger.add(sys.stdout, level="TRACE", format="<green>{time}</green> | <blue>{function}</blue> | {message}", serialize=False)
async def logic(num: int) -> int:
"""Some logic to run"""
delay = random.randint(5, 10)
logger.trace(f"Logic-{num} to be delayed for {delay} seconds")
await asyncio.sleep(delay)
return delay
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
msg = 'started'
logger.trace(msg)
await context.bot.send_message(
chat_id=update.message.chat_id,
text=msg
)
global COUNTER
result = await logic(COUNTER)
COUNTER +=1
msg = f"Finished after {result} seconds of working"
await context.bot.send_message(
chat_id=update.message.chat_id,
text=msg
)
logger.trace(msg)
def main():
# option 1: block=False
# app = ApplicationBuilder().token(TOKEN).build()
# app.add_handler(CommandHandler("start", start, block=False))
# option 2: set concurrent_updates
app = ApplicationBuilder().token(TOKEN).concurrent_updates(True).build()
app.add_handler(CommandHandler("start", start))
app.run_polling()
if __name__ == '__main__':
main()
希望有帮助。