如何在 APScheduler 作业中使用异步类方法?

问题描述 投票:0回答:1

我使用 APScheduler 的 AsyncIOScheduler 和 AsyncIOExecutor 作为机器人的一部分。一个简单的测试函数,在几秒钟后发送一条“pong”消息,工作正常,但异步类方法什么也不做,并记录一个运行时警告,表明协程从未等待过。

我正在使用相当小的 APS 设置,并使用 SQLite DB 作为轻量级作业持久层:

def new_scheduler():
    jobstores = {
        "memory": MemoryJobStore(),
        "default": SQLAlchemyJobStore(url="sqlite:///.jobs.sqlite"),
    }
    executors = {
        "default": AsyncIOExecutor(),
    }
    return AsyncIOScheduler(
        jobstores=jobstores, executors=executors, timezone=datetime.timezone.utc
    )

问题似乎来自使用类方法,因为使用在任何类外部定义的异步函数来调度一个简单的任务,效果很好。

所涉及的类方法 (

ModAction.lift
) 是对用户施加限制的基类的一部分,该基类负责删除限制并将其从数据库中删除。

当我应用限制时,如果附加了到期日期,我会在调度程序上创建一个作业来调用

lift
类方法,并存储其 ID,因此如果在到期之前手动解除限制,我们可以取消它:

class ModAction(Document): # Document inherits from pydantic.BaseModel, providing important fields and methods for interacting with the database.
    # FakeMember is a serialization class that reduces Members to a user and guild ID
    issuer: FakeMember
    target: FakeMember
    reason: Optional[str] = None
    expires: Optional[datetime.datetime] = None
    lift_job_id: Optional[str] = None

    async def apply(self, bot: Bot, no_dm: bool = False):
        if not no_dm:
            await try_dm(await self.target.get_or_fetch(bot), embeds=[self.dm_embed])
        if self.expires is not None and self.expires > now():
            self.lift_job_id = bot.scheduler.add_job(
                self.__class__.lift,
                kwargs={
                    "id": self.id,
                    "reason": "Expired",
                },
                trigger="date",
                next_run_time=self.expires,
            ).id
        await bot.log(embed=self.apply_log_embed(bot))
        await self.store(bot.db_connection)

    @classmethod
    @validate_call
    async def lift(
        cls,
        id: DocumentId,
        reason: Optional[str] = None,
        member: Optional[FakeMember] = None,
    ):
        from cogs.mod import _bot # The bot that loads the extension is stored in this global variable.

        self = await cls.fetch(id, _bot.db_connection)
        await _bot.log(embed=self.lift_log_embed(_bot, reason, member))
        await _bot.db_connection.delete(id)

不同的 mod 操作有不同的执行方式——因此我使用类方法。以一个简单的服务器禁令为例:

class ServerBan(ModAction):
    async def apply(self, bot: Bot, no_dm: bool = False):
        await super().apply(bot, no_dm)
        await self.target.get(bot).ban(reason=self.apply_audit_reason(bot))

    @classmethod
    @validate_call
    async def lift(
        cls,
        id: DocumentId,
        reason: Optional[str] = None,
        member: Optional[FakeMember] = None,
    ):
        from cogs.mod import _bot

        self: cls = await cls.fetch(id, _bot.db_connection)
        await super().lift(id, reason, member)
        await self.target.get(_bot).unban(
            reason=cls.lift_audit_reason(_bot, reason, member)
        )

这是APS的东西吗?使用类方法?或者我只是错误地使用它们? 如果需要,我愿意尝试其他解决方案来处理这个问题——我并没有特别投入使用 APScheduler,因为该机器人仍处于早期开发阶段。 无论是建议修复 APS 还是全新的解决方案,我都洗耳恭听。

python python-asyncio apscheduler
1个回答
0
投票

再次回答我自己的问题——我找到了解决方法。我没有直接添加类方法作为作业的回调,而是为自动提升创建了一个静态方法,该方法将子类作为参数,并调用子类上的

lift
类方法。我的最终代码如下所示:

class ModAction(Document):
    """A base class for disciplinary actions issued by server staff against other members."""
    # --snip--
    async def apply(self, bot: Bot, no_dm: bool = False):
        if not no_dm:
            await try_dm(await self.target.get_or_fetch(bot), embeds=[self.dm_embed])
        if self.expires is not None and self.expires > now():
            self.lift_job_id = bot.scheduler.add_job(
                self.auto_lift,
                kwargs={"subclass": self.__class__, "id": self.id},
                trigger="date",
                next_run_time=self.expires,
            ).id
        await bot.log(embed=self.apply_log_embed(bot))
        await self.store(bot.db_connection)

    @staticmethod
    async def auto_lift(subclass: Type["ModAction"], id: DocumentId, reason = "Expired"):
        await subclass.lift(id, reason=reason)

    @classmethod
    @validate_call
    async def lift(
        cls,
        id: DocumentId,
        reason: Optional[str] = None,
        member: Optional[FakeMember] = None,
    ):
        from cogs.mod import _bot

        obj = await cls.fetch(id, _bot.db_connection)
        await _bot.log(embed=obj.lift_log_embed(_bot, reason, member))
        await _bot.db_connection.delete(id)
© www.soinside.com 2019 - 2024. All rights reserved.