我在构建机器人方面还很陌生,所以我创建了一个非常简单的 Telegram 机器人,它工作得很好,但无法弄清楚如何让机器人在启动 /start_auto 命令时每 n 分钟或 n 小时发送消息。
我用 while 循环做了一个解决方法,但它看起来很愚蠢,并且在循环期间用户将无法与机器人就其他主题进行交互。
我希望用户能够通过 /start_auto 和 /stop_auto 等命令启动和停止这个计划任务。
我知道还有许多其他已回答的与此主题相关的问题,但似乎没有一个问题适用于我的代码。
import logging
import os
import time
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters
logger = logging.getLogger(__name__)
PORT = int(os.environ.get('PORT', '8443'))
def start(update, context):
"""Sends a message when the command /start is issued."""
update.message.reply_text('Hi!')
def help(update, context):
"""Sends a message when the command /help is issued."""
update.message.reply_text('Help!')
def start_auto(update, context):
"""Sends a message when the command /start_auto is issued."""
n = 0
while n < 12:
time.sleep(3600)
update.message.reply_text('Auto message!')
n += 1
def error(update, context):
"""Logs Errors caused by Updates."""
logger.warning('Update "%s" caused error "%s"', update, context.error)
def main():
TOKEN = 'TOKEN_GOES_HERE'
APP_NAME = 'https://my-tele-test.herokuapp.com/'
updater = Updater(TOKEN, use_context=True)
# Get the dispatcher to register handlers
dp = updater.dispatcher
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("help", help))
dp.add_handler(CommandHandler("start_auto", start_auto))
# log all errors
dp.add_error_handler(error)
updater.start_webhook(listen="0.0.0.0", port=PORT, url_path=TOKEN, webhook_url=APP_NAME + TOKEN)
updater.idle()
if __name__ == '__main__':
main()
将发布我找到的解决方案:
def callback_auto_message(context):
context.bot.send_message(chat_id='12345678', text='Automatic message!')
def start_auto_messaging(update, context):
chat_id = update.message.chat_id
context.job_queue.run_repeating(callback_auto_message, 10, context=chat_id, name=str(chat_id))
# context.job_queue.run_once(callback_auto_message, 3600, context=chat_id)
# context.job_queue.run_daily(callback_auto_message, time=datetime.time(hour=9, minute=22), days=(0, 1, 2, 3, 4, 5, 6), context=chat_id)
def stop_notify(update, context):
chat_id = update.message.chat_id
context.bot.send_message(chat_id=chat_id, text='Stopping automatic messages!')
job = context.job_queue.get_jobs_by_name(str(chat_id))
job[0].schedule_removal()
在主函数中我创建了一个命令:
dp.add_handler(CommandHandler("auto", start_auto_messaging))
dp.add_handler(CommandHandler("stop", stop_notify))
这里一切都很简单,你只需要时间库和一个 while 循环来发送自动消息。 time.sleep(30)) 显示消息发送的频率(以秒为单位)。
import telebot
from telebot import custom_filters
import time
bot = telebot.TeleBot("TOKEN FOR BOT")
# Chat id can be private or supergroups.
@bot.message_handler(chat_id=[123456789], commands=['start']) # chat_id checks id corresponds to your list or not.
def admin_reply(message):
bot.send_message(message.chat.id, "You are allowed to use this command.")
while(True):
bot.send_message(message.chat.id, "Allowed to receive an auto-message every 30 seconds.", time.sleep(30))
@bot.message_handler(commands=['start'])
def not_admin(message):
bot.send_message(message.chat.id, "You are not allowed to use this command")
while(True):
bot.send_message(message.chat.id, "Allowed to receive an auto-message every minute.", time.sleep(60))
# Do not forget to register
bot.add_custom_filter(custom_filters.ChatFilter())
bot.polling(none_stop=True)
为什么不使用热解图。它让一切变得更容易。
from pyrogram import Client, filters
import time
app = Client('my_account', #Update this line with
api_id=API_ID, #your API_ID
api_hash=API_HASH, #your API_HASH
bot_token=BOT_TOKEN) #your BOT_TOKEN
def main():
while(True):
app.send_message(chat_id=123456789, text="This message is sent every 5 mins")
time.sleep(5000) app.run()