我正在使用 alpaca-py(Alpaca 的新 python 包)来创建一个基本的 tradebot。我的目标是让机器人进行交易(买入),从 Alpaca 的 webhook 获取有关订单是否已完成的数据(以及其他一些信息),然后使用相同的股票进行另一笔交易(卖出)。在尝试集成 Webhook 之前,机器人的买卖情况良好。然而,我似乎无法启动并运行协程。
我尝试过以下方法:
我使用 Django 来运行机器人,因为我喜欢它的 BaseCommand 类。我不认为 django 与这个问题有任何关系。这是我的代码:
class TradeMaker():
def __init__(self, **kwargs):
self.paper_bool = kwargs.get('paper_bool', True)
self.random_bool = kwargs.get('random', True)
self.symbol_or_symbols = kwargs.get('symbol_or_symbols', 'AAPL')
self.amount = kwargs.get('amount', 40000)
self.seconds_between = kwargs.get('seconds_between', 4)
self.log = kwargs.get('log')
self.trading_client, self.trading_stream, self.account = self.open_client()
self.trade_update_info = None
self.order_filled = False
self.shares_bought = 0
self.current_symbol = None
def open_client(self):
trading_client = TradingClient(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
try:
account = trading_client.get_account()
except Exception as e:
logger.error(f"Exception in login: {e}")
return trading_client, trading_stream, account
async def trade_update_handler(self, data):
logger.info('Trade Update called')
print("Trade Update:", data)
if data.event == TradeEvent.FILL:
if data.order.side == OrderSide.BUY:
self.order_filled = True
self.shares_bought = data.order.filled_qty
self.current_symbol = data.order.symbol
async def run_stream(self):
logger.info('Subscribing to trade updates')
self.trading_stream.subscribe_trade_updates(self.trade_update_handler)
logger.info('Preparing stream')
await self.trading_stream._run_forever()
async def stop_stream(self):
logger.info('Stopping stream')
trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
await trading_stream.stop()
def get_symbol(self):
if self.random_bool:
symbol = random.choice(self.symbol_or_symbols)
return symbol
else:
symbol = self.symbol_or_symbols
return symbol
def buy(self):
symbol = self.get_symbol()
market_order_data = MarketOrderRequest(
symbol=symbol,
qty=1,
side=OrderSide.BUY,
time_in_force=TimeInForce.DAY
)
try:
market_order_buy = self.trading_client.submit_order(
order_data=market_order_data
)
except Exception as e:
logger.error(f"Failed to buy {symbol}: {e}")
return None
return symbol, market_order_buy
def sell(self, symbol):
symbol = symbol
shares = self.shares_bought
market_order_data = MarketOrderRequest(
symbol=symbol,
qty=250,
side=OrderSide.SELL,
time_in_force=TimeInForce.DAY
)
try:
market_order_sell = self.trading_client.submit_order(
order_data=market_order_data
)
except Exception as e:
logger.error(f"Failed to sell {symbol}: {e}")
return None
return market_order_sell
async def make_trades(self):
market_close = datetime.datetime.now().replace(hour=14, minute=0, second=0, microsecond=0)
while datetime.datetime.now() < market_close:
seconds = self.seconds_between
try:
symbol, market_order_buy = self.buy()
print(f"Bought {symbol}: {market_order_buy}")
except Exception as e:
logger.error(f"Failed to buy during trade: {e}")
return None
while not self.order_filled:
logger.info('Waiting for order status update')
await asyncio.sleep(1)
sleep(seconds)
try:
market_order_sell = self.sell(symbol=symbol)
print(f"Sold {self.current_symbol}: {market_order_sell}")
except Exception as e:
logger.error(f"Failed to sell during trade: {e}")
return None
self.order_filled = False
self.shares_bought = 0
sleep(seconds)
print('Market closed, shutting down.')
class Command(BaseCommand):
help = """This bot trades the target stock. If you want it to choose randomly, pass it a list and set the variable random=True
"""
model = None
def add_arguments(self, parser):
parser.add_argument(
'--paper',
type=bool,
help='Set false to live trade.',
default=True
)
parser.add_argument(
'--folder',
type=str,
help='source folder for files',
default=''
)
parser.add_argument(
'--symbol',
type=str,
help='target symbol, or list of symbols',
default='AAPL'
)
parser.add_argument(
'--random',
type=bool,
help="Set to true if passing a list of symbols to choose randomly from.",
default=False
)
parser.add_argument(
'--tradevalue',
type=int,
help="The amount the bot should trade. e.g. $40000",
default=40000
)
parser.add_argument(
'--seconds',
type=int,
help="The number of seconds the bot should wait between each trade.",
default=4
)
def handle(self, **options):
paper_bool = options['paper']
random_bool = options['random']
symbol_or_symbols = options['symbol']
amount = options['tradevalue']
seconds_between = options['seconds']
log = options['folder']
tm = TradeMaker(
paper_bool = paper_bool,
random = random_bool,
symbol_or_symbols = symbol_or_symbols,
amount = amount,
seconds_between = seconds_between,
log = log
)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(
tm.run_stream(),
tm.make_trades()
))
except KeyboardInterrupt:
tm.stop_stream()
print("Stopped with Interrupt")
finally:
tm.stop_stream()
loop.close()
当我运行命令时,我在终端中得到以下输出(为了安全起见,信息经过审查):
python manage.py trade_maker_v5
2023-03-30 11:51:48,342 - INFO - Subscribing to trade updates
2023-03-30 11:51:48,342 - INFO - Preparing stream
Bought AAPL: id=UUID('foo') client_order_id='bar' created_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995853, tzinfo=datetime.timezone.utc) updated_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995921, tzinfo=datetime.timezone.utc) submitted_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 994623, tzinfo=datetime.timezone.utc) filled_at=None expired_at=None canceled_at=None failed_at=None replaced_at=None replaced_by=None replaces=None asset_id=UUID('foo') symbol='AAPL' asset_class=<AssetClass.US_EQUITY: 'us_equity'> notional=None qty='1' filled_qty='0' filled_avg_price=None order_class=<OrderClass.SIMPLE: 'simple'> order_type=<OrderType.MARKET: 'market'> type=<OrderType.MARKET: 'market'> side=<OrderSide.BUY: 'buy'> time_in_force=<TimeInForce.DAY: 'day'> limit_price=None stop_price=None status=<OrderStatus.PENDING_NEW: 'pending_new'> extended_hours=False legs=None trail_percent=None trail_price=None hwm=None
2023-03-30 11:51:48,480 - INFO - Waiting for order status update
2023-03-30 11:51:49,493 - INFO - Waiting for order status update
2023-03-30 11:51:50,500 - INFO - Waiting for order status update
如果我在机器人运行时在另一个终端中单独运行 webhook,它就会起作用。我可以运行以下代码:
from alpaca.trading.stream import TradingStream
trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=True)
async def update_handler(data):
print(data)
trading_stream.subscribe_trade_updates(update_handler)
trading_stream.run()
当我的机器人运行时,它将打印出所有数据。为什么它可以单独工作,而不是在协程中?
Django 导致了这个问题。从 Django 中删除机器人并使其成为独立机器人并添加更多异步语句后,它现在可以工作了。
Django 最近在 4.2 中添加了异步,但该项目运行的是 Django 4.1
如果有帮助的话,我遇到了类似的问题,我通过将 Trading_stream._run_forever() 包装在异步函数中来解决它。这是代码:
from alpaca.trading.stream import TradingStream
from alpaca.trading.client import TradingClient
from alpaca.trading.requests import LimitOrderRequest
import asyncio
API_KEY = ''
API_SECRET = ''
async def update_handler(data):
# trade updates will arrive in our async handler
print(data)
async def run_stream(trading_stream):
await trading_stream._run_forever()
async def main():
trading_stream = TradingStream(API_KEY, API_SECRET, paper=True)
trading_stream.subscribe_trade_updates(update_handler)
# Start the stream in a separate task and wait 3 seconds for the ws connection to establish
asyncio.create_task(run_stream(trading_stream))
await asyncio.sleep(3)
# Send an order to check if we receive events
trading_client = TradingClient(API_KEY, API_SECRET, paper=True)
order = LimitOrderRequest(symbol="TSLA", qty=1, side="buy", type="limit", limit_price=150, time_in_force="day", extended_hours=True)
trading_client.submit_order(order_data=order)
print("order sent")
# Keep the main coroutine running indefinitely
await asyncio.Event().wait()
asyncio.run(main())