我需要机器人响应启动命令,但我不知道如何在主机器人文件中正确使用它。
start.py:
from aiogram import types
from bot import router
from aiogram.filters import CommandStart
class StartCommandHandler:
@router.message(CommandStart())
async def start(message: types.Message):
await message.reply("hello")
bot.py:
import logging
import sys
from typing import List
from aiohttp import web
from aiogram import Bot, Dispatcher, Router
from aiogram.enums import ParseMode
from aiogram.webhook.aiohttp_server import SimpleRequestHandler, setup_application
from Config.EnvService import Env
from Config.ServerService import Server
router = Router()
class BotMars(Bot):
WEBHOOK_PATH = Server.createWebhookPATH(Env.get("BOT_TOKEN"))
APP = app = web.Application()
def __init__(self, token, dp):
self.TOKEN = token
self.DP = dp
self.BASEBOT = Bot(
self.TOKEN,
parse_mode=ParseMode.HTML,
session=Server.getServerSession()
)
async def on_startup(self):
await self.BASEBOT.set_webhook(
url=f"{Server.getWebhookURL()}{self.WEBHOOK_PATH}",
drop_pending_updates=True
)
webhook_requests_handler = SimpleRequestHandler(dispatcher=self.DP, bot=self.BASEBOT,)
webhook_requests_handler.register(self.APP, path=self.WEBHOOK_PATH)
def webhook_handler(self):
setup_application(self.APP, self.DP, bot=self.BASEBOT)
web.run_app(self.APP, host=Server.getServerHost(), port=int(Server.getServerPort()))
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
dp = Dispatcher()
dp.include_router(router)
bot = BotMars(Env.get("BOT_TOKEN"), dp)
dp.startup.register(bot.on_startup)
bot.webhook_handler()
from App.start import start
我尝试像以前在函数式编程中所做的那样,只需导入所有内容(*),但没有成功,请帮助:)
您应该阅读有关魔法过滤器等的内容。
以下是从另一个文件注册事件处理程序的示例:
bot.py
import asyncio
import logging
from aiogram import Bot, Dispatcher, F
from aiogram.filters import Command
# Import the necessary functions
from handlers import get_start, get_message
TOKEN = ''
bot = Bot(TOKEN, parse_mode='HTML')
dispatcher = Dispatcher()
async def update():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Registering event handlers
dispatcher.message.register(get_start, Command(commands=['start']))
dispatcher.message.register(get_message, F.text)
while True:
await dispatcher.start_polling(bot)
if __name__ == '__main__':
asyncio.run(update())
处理程序.py
from aiogram.types import Message
async def get_start(message: Message):
await message.answer('You called the "start" command')
async def get_message(message: Message):
await message.answer('You wrote me a text message')
链接: