我正在尝试在 asyncio 模块中将异步线程与 TWS API 结合使用。看来我可以正常工作,但我只需要在主函数中收集线程,以便它们都在同一个事件循环上运行?感谢帮助!错误和代码如下。我还在启动 websocket 线程之前运行了 subscribe 函数并且它有效。只是当我尝试在不同的线程上运行它时。
class TradingApp(EWrapper, EClient):
def __init__(self):
EClient.__init__(self,self)
def position(self, account, contract, position, avgCost):
global master_df
global master_sub_req
super().position(account, contract, position, avgCost)
print("Position.", "Account:", account, "Symbol:", contract.symbol, "SecType:",
contract.secType, "Currency:", contract.currency,
"Position:", decimalMaxString(position), "Avg cost:", floatMaxString(avgCost))
row = {'Contract ID': contract.conId, 'Contract Symbol': contract.symbol, 'Position': position, 'Avg cost':avgCost}
master_df = pd.concat((master_df, pd.DataFrame([row])))
subscribe(contract.conId, contract)
def positionEnd(self):
print('position end')
def pnlSingle(self, reqId, pos, dailyPnL, unrealizedPnL, realizedPnL, value):
global master_df
global master_sub_req
super().pnlSingle(reqId, pos, dailyPnL, unrealizedPnL, realizedPnL, value)
print("Daily PnL Single. ReqId:", reqId, "Position:", decimalMaxString(pos),
"DailyPnL:", floatMaxString(dailyPnL), "UnrealizedPnL:", floatMaxString(unrealizedPnL),
"RealizedPnL:", floatMaxString(realizedPnL), "Value:", floatMaxString(value))
def tickPrice(self, reqId, tickType, price, attrib):
global master_df
super().tickPrice(reqId, tickType, price, attrib)
print(attrib)
print("TickPrice. TickerId:", reqId, "tickType:", tickType, "Price:", price)
for key, value in subscription_requests_tick_price:
if master_df.loc[master_df['Contract Symbol'] == key]:
master_df.loc[master_df['Contract Symbol'] == key,'Tick Price Request ID'] = value
master_df.loc[master_df['Contract Symbol'] == key, "Market Price"] = price
'''
def error(id, errorCode, errorMsg):
print(errorCode)
print(errorMsg)
async def subscribe(contract_id, contract):
global master_sub_req
master_sub_req += 1
app.reqPnLSingle(master_sub_req, "DU7058034", "", contract_id)
time.sleep(2)
master_sub_req += 1
app.reqMktData(reqId=master_sub_req,
contract=contract,
genericTickList="",
snapshot=False,
regulatorySnapshot=False,
mktDataOptions=[])
time.sleep(2)
'''
def websocket_con():
app.run()
app = TradingApp()
# starting a separate daemon thread to execute the websocket connection
con_thread = threading.Thread(target=websocket_con)
#define threading events to use for app threads
#position_event.set(), .clear(), .wait()
position_event = threading.Event()
pnl_single_event = threading.Event()
contract_details_event = threading.Event()
con_thread.start()
app.connect('127.0.0.1', 9225, clientId = 4)
app.reqPositions()
time.sleep(1)
async def subscribe(contract_id, contract):
while True:
global master_sub_req
master_sub_req += 1
app.reqPnLSingle(master_sub_req, "DU7058034", "", contract_id)
time.sleep(2)
master_sub_req += 1
app.reqMktData(reqId=master_sub_req,
contract=contract,
genericTickList="",
snapshot=False,
regulatorySnapshot=False,
mktDataOptions=[])
asyncio.sleep(2)
subscribe_thread = threading.Thread(target=subscribe, args =(app,))
subscribe_thread.start()
time.sleep(1)
这里有错误!
Exception in thread Thread-3 (subscribe):
Traceback (most recent call last):
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.3056.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
self.run()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.3056.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
TypeError: subscribe() missing 1 required positional argument: 'contract'
[Finished in 3.1s]
您的错误与线程或事件循环无关。问题是错误指出的内容:
TypeError:subscribe() 缺少 1 个必需的位置参数:'contract'线路有几个问题:
asyncio
subscribe_thread = threading.Thread(target=subscribe, args =(app,))
。根据您对
subscribe
的定义,它需要两个参数:subscribe
和 contract_id
。你只是通过了它contract
。
app
是一个
协程。它不能在
subscribe
事件循环之外直接调用。因此,即使您修复了上面一行中的参数错误,当您尝试启动线程时,您也会收到新的错误,因为您无法直接运行asyncio
。
subscribe
来完成什么。据我所知,
TWS API没有
asyncio
实现或模块。asyncio
事件循环必须在其自己的线程上运行,并且除了协程和任务之外,不应在那里运行其他代码(在几乎所有情况下)。如果你想在自己的线程上启动事件循环,那么你需要调用像
asyncio
这样的东西作为线程的目标,并将其传递给一个主协程作为参数来运行。在这里查看答案: