为什么我的websocket协程在下面的代码中没有被调用?

问题描述 投票:0回答:2

我正在使用 alpaca-py(Alpaca 的新 python 包)来创建一个基本的 tradebot。我的目标是让机器人进行交易(买入),从 Alpaca 的 webhook 获取有关订单是否已完成的数据(以及其他一些信息),然后使用相同的股票进行另一笔交易(卖出)。在尝试集成 Webhook 之前,机器人的买卖情况良好。然而,我似乎无法启动并运行协程。

我尝试过以下方法:

  1. 将await语句移至协程中的不同区域。
  2. 更改方法的位置并从各种方法中删除异步。
  3. 查看 Alpaca 的文档。 (不幸的是,alpaca-py 于 2023 年推出,他们的很多文档都已经过时了)
  4. 阅读 TradingStream 代码以确保我所做的一切都是正确的。一切看起来都不错。
  5. 更改 asyncio.gather 调用并将它们作为例程运行。我得到了同样的结果。
  6. 将记录器语句添加到代码中。这告诉我,我的方法“trade_update_handler”没有被调用,因为没有任何内容打印到控制台。
  7. 使用“run()”而不是“_run_forever()”,但这会导致 webhook 端出现错误。

我使用 Django 来运行机器人,因为我喜欢它的 BaseCommand 类。我不认为 django 与这个问题有任何关系。这是我的代码:

class TradeMaker():
    def __init__(self, **kwargs):
        self.paper_bool = kwargs.get('paper_bool', True)
        self.random_bool = kwargs.get('random', True)
        self.symbol_or_symbols = kwargs.get('symbol_or_symbols', 'AAPL')
        self.amount = kwargs.get('amount', 40000)
        self.seconds_between = kwargs.get('seconds_between', 4)
        self.log = kwargs.get('log')
        self.trading_client, self.trading_stream, self.account = self.open_client()
        self.trade_update_info = None
        self.order_filled = False
        self.shares_bought = 0
        self.current_symbol = None
    
    def open_client(self):
        trading_client = TradingClient(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        try:
            account = trading_client.get_account()
        except Exception as e:
            logger.error(f"Exception in login: {e}")
        return trading_client, trading_stream, account
    
    async def trade_update_handler(self, data):
        logger.info('Trade Update called')
        print("Trade Update:", data)
        if data.event == TradeEvent.FILL:
            if data.order.side == OrderSide.BUY:
                self.order_filled = True
                self.shares_bought = data.order.filled_qty
                self.current_symbol = data.order.symbol

    async def run_stream(self):
        logger.info('Subscribing to trade updates')
        self.trading_stream.subscribe_trade_updates(self.trade_update_handler)
        logger.info('Preparing stream')
        await self.trading_stream._run_forever()

    async def stop_stream(self):
        logger.info('Stopping stream')
        trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        await trading_stream.stop()

    def get_symbol(self):
        if self.random_bool:
            symbol = random.choice(self.symbol_or_symbols)
            return symbol
        else:
            symbol = self.symbol_or_symbols
            return symbol

    def buy(self):
        symbol = self.get_symbol()
        market_order_data = MarketOrderRequest(
            symbol=symbol,
            qty=1,
            side=OrderSide.BUY,
            time_in_force=TimeInForce.DAY
        )
        try:
            market_order_buy = self.trading_client.submit_order(
                    order_data=market_order_data
            )
        except Exception as e:
            logger.error(f"Failed to buy {symbol}: {e}")
            return None
        return symbol, market_order_buy

    def sell(self, symbol):
        symbol = symbol
        shares = self.shares_bought
        
        market_order_data = MarketOrderRequest(
            symbol=symbol,
            qty=250,
            side=OrderSide.SELL,
            time_in_force=TimeInForce.DAY
        )
        try:
            market_order_sell = self.trading_client.submit_order(
                    order_data=market_order_data
            )
        except Exception as e:
            logger.error(f"Failed to sell {symbol}: {e}")
            return None
        return market_order_sell

    async def make_trades(self):
        market_close = datetime.datetime.now().replace(hour=14, minute=0, second=0, microsecond=0)
        while datetime.datetime.now() < market_close:
            seconds = self.seconds_between
            try:
                symbol, market_order_buy = self.buy()
                print(f"Bought {symbol}: {market_order_buy}")
            except Exception as e:
                logger.error(f"Failed to buy during trade: {e}")
                return None
            while not self.order_filled:
                logger.info('Waiting for order status update')
                await asyncio.sleep(1)
            sleep(seconds)
            try:
                market_order_sell = self.sell(symbol=symbol)
                print(f"Sold {self.current_symbol}: {market_order_sell}")
            except Exception as e:
                logger.error(f"Failed to sell during trade: {e}")
                return None
            self.order_filled = False
            self.shares_bought = 0
            sleep(seconds)
        print('Market closed, shutting down.')

class Command(BaseCommand):
    help = """This bot trades the target stock. If you want it to choose randomly, pass it a list and set the variable random=True
    """
    model = None

    def add_arguments(self, parser):
        parser.add_argument(
            '--paper',
            type=bool,
            help='Set false to live trade.',
            default=True
        )
        parser.add_argument(
            '--folder',
            type=str,
            help='source folder for files',
            default=''
        )
        parser.add_argument(
            '--symbol',
            type=str,
            help='target symbol, or list of symbols',
            default='AAPL'
        )
        parser.add_argument(
            '--random',
            type=bool,
            help="Set to true if passing a list of symbols to choose randomly from.",
            default=False
        )
        parser.add_argument(
            '--tradevalue',
            type=int,
            help="The amount the bot should trade. e.g. $40000",
            default=40000
        )
        parser.add_argument(
            '--seconds',
            type=int,
            help="The number of seconds the bot should wait between each trade.",
            default=4
        )

    def handle(self, **options):
        paper_bool = options['paper']
        random_bool = options['random']
        symbol_or_symbols = options['symbol']
        amount = options['tradevalue']
        seconds_between = options['seconds']
        log = options['folder']
        tm = TradeMaker(
            paper_bool = paper_bool,
            random = random_bool,
            symbol_or_symbols = symbol_or_symbols,
            amount = amount,
            seconds_between = seconds_between,
            log = log
        )
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(asyncio.gather(
                tm.run_stream(),
                tm.make_trades()
            ))
        except KeyboardInterrupt:
            tm.stop_stream()
            print("Stopped with Interrupt")
        finally:
            tm.stop_stream()
            loop.close()

当我运行命令时,我在终端中得到以下输出(为了安全起见,信息经过审查):

    python manage.py trade_maker_v5
2023-03-30 11:51:48,342 - INFO - Subscribing to trade updates
2023-03-30 11:51:48,342 - INFO - Preparing stream
Bought AAPL: id=UUID('foo') client_order_id='bar' created_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995853, tzinfo=datetime.timezone.utc) updated_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995921, tzinfo=datetime.timezone.utc) submitted_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 994623, tzinfo=datetime.timezone.utc) filled_at=None expired_at=None canceled_at=None failed_at=None replaced_at=None replaced_by=None replaces=None asset_id=UUID('foo') symbol='AAPL' asset_class=<AssetClass.US_EQUITY: 'us_equity'> notional=None qty='1' filled_qty='0' filled_avg_price=None order_class=<OrderClass.SIMPLE: 'simple'> order_type=<OrderType.MARKET: 'market'> type=<OrderType.MARKET: 'market'> side=<OrderSide.BUY: 'buy'> time_in_force=<TimeInForce.DAY: 'day'> limit_price=None stop_price=None status=<OrderStatus.PENDING_NEW: 'pending_new'> extended_hours=False legs=None trail_percent=None trail_price=None hwm=None
2023-03-30 11:51:48,480 - INFO - Waiting for order status update
2023-03-30 11:51:49,493 - INFO - Waiting for order status update
2023-03-30 11:51:50,500 - INFO - Waiting for order status update

如果我在机器人运行时在另一个终端中单独运行 webhook,它就会起作用。我可以运行以下代码:

    from alpaca.trading.stream import TradingStream

trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=True)

async def update_handler(data):
    print(data)

trading_stream.subscribe_trade_updates(update_handler)
trading_stream.run()

当我的机器人运行时,它将打印出所有数据。为什么它可以单独工作,而不是在协程中?

python asynchronous python-asyncio webhooks alpaca
2个回答
0
投票

Django 导致了这个问题。从 Django 中删除机器人并使其成为独立机器人并添加更多异步语句后,它现在可以工作了。

Django 最近在 4.2 中添加了异步,但该项目运行的是 Django 4.1


0
投票

如果有帮助的话,我遇到了类似的问题,我通过将 Trading_stream._run_forever() 包装在异步函数中来解决它。这是代码:

from alpaca.trading.stream import TradingStream
from alpaca.trading.client import TradingClient
from alpaca.trading.requests import LimitOrderRequest
import asyncio

API_KEY = ''
API_SECRET = ''

async def update_handler(data):
    # trade updates will arrive in our async handler
    print(data)

async def run_stream(trading_stream):
    await trading_stream._run_forever()

async def main():
    trading_stream = TradingStream(API_KEY, API_SECRET, paper=True)
    trading_stream.subscribe_trade_updates(update_handler)
    
    # Start the stream in a separate task and wait 3 seconds for the ws connection to establish
    asyncio.create_task(run_stream(trading_stream))
    await asyncio.sleep(3)

    # Send an order to check if we receive events    
    trading_client = TradingClient(API_KEY, API_SECRET, paper=True)
    order = LimitOrderRequest(symbol="TSLA", qty=1, side="buy", type="limit", limit_price=150, time_in_force="day", extended_hours=True)
    trading_client.submit_order(order_data=order)
    print("order sent")

    # Keep the main coroutine running indefinitely
    await asyncio.Event().wait()    

asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.