如何在单独线程内的同步函数中向用户发送消息?

问题描述 投票:0回答:1

我目前正在开发一个电报机器人。我想创建一个完全像 CLI 游戏一样的游戏,但您需要向 telegram 机器人发送消息才能玩它。我遇到了一个问题,我的游戏在无限 while 循环上运行,为了以非阻塞方式运行该游戏,我决定将其放在单独的线程中。问题是

context.bot.send_message(...)
不会在线程中发送任何消息,我怀疑这是因为
context.bot.send_message(...)
是异步的并且是从同步上下文中调用的。

这是我写的代码:

async def start_game_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
    if context.user_data.get("game") is not None:
        await context.bot.send_message(chat_id=update.effective_chat.id, parse_mode="markdown", text="Game has already started!")
        return 

    game = Game()

    context.user_data["game"] = game
    
    game_process = threading.Thread(
        target=game.start,
        args=(
            functools.partial( 
                context.bot.send_message, # this is basically a `plugin_func()` from the next code block
                chat_id=update.effective_chat.id,
                parse_mode="markdown",
                text=f"```text {context.user_data['game'].get_graphics()}```"
            ),
        )
    )

    game_process.start()

此函数启动游戏并将函数

context.bot.send_message(...)
传递给线程,以便它可以插入到游戏的主 while 循环中。 这是一个
start
函数本身:

class Game:
    ...

    def start(self, plugin_func: Callable) -> None:
        self._game_started = True

        while self._game_started and not self._game_finished:
            self.__update_player_pos()
            self.__update_surroundings_position()
            plugin_func() # This is the place where it gives a warning
            print(self.get_graphics())
            time.sleep(self.game_pacing)

这是我收到的警告消息:

RuntimeWarning: coroutine 'ExtBot.send_message' was never awaited
  plugin_func()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

问题是游戏开始并且

print(self.get_graphics())
完美地打印出游戏的当前状态。但
plugin_func
不会在我的电报机器人中发送任何消息。

我认为问题在于

plugin_func
是异步的,我尝试从同步上下文中调用它,但我不想将我的
start
函数更改为异步,所以我应该做什么?

python asynchronous python-telegram-bot
1个回答
0
投票

如果不更改 Game 类中的代码,我找不到解决方案。但这就是我最终改变的内容(代码后的解释)

这是我的main.py:

async def start_game_handler(update: Update, context: ContextTypes.DEFAULT_TYPE, application):
    if context.user_data.get("game") is not None:
        await context.bot.send_message(chat_id=update.effective_chat.id, parse_mode="markdown", text="Game has already started!")
        return 

    game = Game()

    context.user_data["game"] = game
    chat_id = update.effective_chat.id
    
    game_process = threading.Thread(
        target=game.start,
        args=(
            functools.partial(
                context.bot.send_message,
                parse_mode="markdown",
                chat_id=chat_id
            ),
        )
    )

    game_process.start()

    print(f"Field:\n{context.user_data['game'].get_graphics()}")

我的游戏课:

class Game:
    ...

    def start(self, plugin_func: Callable) -> None:
        self._game_started = True
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        while self._game_started and not self._game_finished:
            self.__update_player_pos()
            self.__update_surroundings_position()
            asyncio.get_event_loop().run_until_complete(plugin_func(text=f"```text{self.get_graphics()}```"))
            print(self.get_graphics())
            time.sleep(self.game_pacing)
            
        loop.close()

因此,因为我在单独的线程内调用启动函数,所以我需要以某种方式在其中创建一个异步事件循环。如果我每次调用

plugin_func
时都创建一个事件循环,我认为它不会那么高效。因此,我在 while 循环之外创建了事件循环,并使用我创建的事件循环调用了我的
plugin_func

© www.soinside.com 2019 - 2024. All rights reserved.