如何编写中间件将媒体组转发到 aiogram 3? 组中的用户向机器人发送命令。机器人将向他们发送一条消息,他们必须通过媒体组回复该消息,然后机器人会将其转发给使用该机器人的每个人。但由于向机器人发送媒体组需要多条消息,因此我需要一个中间件来收集所有消息,然后将它们发送给用户
我尝试使用 aiogram 2 已有的代码,但我不明白如何为版本 3 重做它
简单媒体中间件:
class MediaMiddleware(BaseMiddleware):
def __init__(self):
self.medias = {}
super(ThrottlingMiddleware, self).__init__()
async def __call__(
self,
handler: Callable[[Union[Message, CallbackQuery], Dict[str, Any]], Awaitable[Any]],
event: Union[Message, CallbackQuery],
data: Dict[str, Any]
) -> Any:
if isinstance(event, Message) and event.media_group_id:
self.medias.setdefault(event.media_group_id, [])
if event.message_id not in self.medias[event.media_group_id]:
self.medias[event.media_group_id].append(event)
last_len = len(self.medias[event.media_group_id])
await asyncio.sleep(2.5)
if last_len != len(self.medias[event.media_group_id]):
print('return', last_len, len(self.medias[event.media_group_id]))
return # skip this event
data["media_events"] = self.medias.pop(event.media_group_id)
return await handler(event, data)
用途:
@router.message(F.media_group_id != None)
def on_media_group_id(message: Message, media_events: List[Message] = []):
# message - last sended message
# media_events - all events with the same media_group_id
...