MediaGrop 中间件 Aiogram 3

问题描述 投票:0回答:1

如何编写中间件将媒体组转发到 aiogram 3? 组中的用户向机器人发送命令。机器人将向他们发送一条消息,他们必须通过媒体组回复该消息,然后机器人会将其转发给使用该机器人的每个人。但由于向机器人发送媒体组需要多条消息,因此我需要一个中间件来收集所有消息,然后将它们发送给用户

我尝试使用 aiogram 2 已有的代码,但我不明白如何为版本 3 重做它

python telegram middleware media aiogram
1个回答
0
投票

简单媒体中间件:

class MediaMiddleware(BaseMiddleware):
    def __init__(self):
        self.medias = {}
        super(ThrottlingMiddleware, self).__init__()


    async def __call__(
        self,
        handler: Callable[[Union[Message, CallbackQuery], Dict[str, Any]], Awaitable[Any]],
        event: Union[Message, CallbackQuery],
        data: Dict[str, Any]
    ) -> Any:

        if isinstance(event, Message) and event.media_group_id:
            self.medias.setdefault(event.media_group_id, [])

            if event.message_id not in self.medias[event.media_group_id]:
                self.medias[event.media_group_id].append(event)
            
            last_len = len(self.medias[event.media_group_id])
            await asyncio.sleep(2.5)

            if last_len != len(self.medias[event.media_group_id]):
                print('return', last_len, len(self.medias[event.media_group_id]))
                return # skip this event
            
            data["media_events"] = self.medias.pop(event.media_group_id)

        return await handler(event, data) 

用途:

@router.message(F.media_group_id != None)
def on_media_group_id(message: Message, media_events: List[Message] = []):
    # message - last sended message
    # media_events - all events with the same media_group_id
    ...
© www.soinside.com 2019 - 2024. All rights reserved.