Python TelegramBot 在 job_queue.run_repeating 中运行函数

问题描述 投票:0回答:1

我想以一定的时间间隔向该人发送一条消息,当 check_who_likes 中的代码单独工作时没有问题,但是当我将其作为一个函数并尝试使用 job_queue 运行它时,我收到错误。

async def check_who_likes(update: Update, context: CallbackContext):
    try:
        conn = connection_pool.get_connection()
        cursor = conn.cursor()
        user_id = update.message.from_user.id
        cursor.execute("SELECT LikeUserID FROM Likes WHERE LikedUserID = %s", (user_id,))
        likes = cursor.fetchall()
        like_ids = [like[0] for like in likes]
        like_ids_str = ', '.join(map(str, like_ids))
        sql_query = f"SELECT UserName FROM Users WHERE PersonID IN ({like_ids_str})"
        cursor.execute(sql_query)
        users = cursor.fetchall()
        liked_users_text = ", ".join(str(user[0]) for user in users)
        await context.bot.send_message(chat_id=user_id,
                                       text=f"{len(likes)} kişi sizi beğendi.Bakmak istermisin?\n\n\n1.Göster.\n2.Artık aramıyorum.")

    except Exception as e:
        pass
    finally:
        cursor.close()
        conn.close()
        await asyncio.sleep(1)
        if update.message.text == "1":
            return SHOW_WHO_LIKES
async def matching(update: Update, context: CallbackContext):
    context.job_queue.run_repeating(check_who_likes, interval=1, first=0)

错误:
没有注册错误处理程序,记录异常。 回溯(最近一次调用最后一次): 文件“C:\wamp64\www ot env\lib\site-packages elegram xt_jobqueue.py”,第 898 行,在 _run 中 等待 self.callback(上下文) 类型错误:check_who_likes() 缺少 1 个必需的位置参数:'context'

python telegram telegram-bot python-telegram-bot job-queue
1个回答
0
投票

您的回拨无效

来自 docs 回调签名应该是

async def callback(context: CallbackContext)

但是你的代码是

async def check_who_likes(update: Update, context: CallbackContext):

首先您需要更改

check_who_likes
的签名以更正签名

async def check_who_likes(context: CallbackContext, update: Update):

接下来,如果你想要其他参数,有2个选项,你可以尝试:

选项 1:通过在 job_kwargs
 中使用 
JobQueue.run_repeating
传递更多 kwargs
async def matching(context: CallbackContext):
    context.job_queue.run_repeating(check_who_likes, interval=1, first=0, job_kwargs=dict(update=update))
选项 2:使用
functools.partial
功能
from functools import partial

async def matching(context: CallbackContext):
    context.job_queue.run_repeating(partial(check_who_likes, update=update), interval=1, first=0)
© www.soinside.com 2019 - 2024. All rights reserved.