如何在Python中将asyncio模块与TWS API一起使用?

问题描述 投票:0回答:1

我正在尝试在 asyncio 模块中将异步线程与 TWS API 结合使用。看来我可以正常工作,但我只需要在主函数中收集线程,以便它们都在同一个事件循环上运行?感谢帮助!错误和代码如下。我还在启动 websocket 线程之前运行了 subscribe 函数并且它有效。只是当我尝试在不同的线程上运行它时。

class TradingApp(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self,self)

    def position(self, account, contract, position, avgCost):
        
        global master_df

        global master_sub_req

        super().position(account, contract, position, avgCost)

        print("Position.", "Account:", account, "Symbol:", contract.symbol, "SecType:",
              contract.secType, "Currency:", contract.currency,
              "Position:", decimalMaxString(position), "Avg cost:", floatMaxString(avgCost))

        row = {'Contract ID': contract.conId, 'Contract Symbol': contract.symbol, 'Position': position, 'Avg cost':avgCost}

        master_df = pd.concat((master_df, pd.DataFrame([row])))
        
        subscribe(contract.conId, contract)

    def positionEnd(self):

        print('position end')
    
    def pnlSingle(self, reqId, pos, dailyPnL, unrealizedPnL, realizedPnL, value):

        global master_df

        global master_sub_req

        super().pnlSingle(reqId, pos, dailyPnL, unrealizedPnL, realizedPnL, value)

        print("Daily PnL Single. ReqId:", reqId, "Position:", decimalMaxString(pos),
              "DailyPnL:", floatMaxString(dailyPnL), "UnrealizedPnL:", floatMaxString(unrealizedPnL),
              "RealizedPnL:", floatMaxString(realizedPnL), "Value:", floatMaxString(value))

    def tickPrice(self, reqId, tickType, price, attrib):    
        
        global master_df

        super().tickPrice(reqId, tickType, price, attrib)

        print(attrib)

        print("TickPrice. TickerId:", reqId, "tickType:", tickType, "Price:", price)

        for key, value in subscription_requests_tick_price:

            if master_df.loc[master_df['Contract Symbol'] == key]:

                master_df.loc[master_df['Contract Symbol'] == key,'Tick Price Request ID'] = value

                master_df.loc[master_df['Contract Symbol'] == key, "Market Price"] = price
'''
    def error(id, errorCode, errorMsg):

        print(errorCode)

        print(errorMsg)


async def subscribe(contract_id, contract):

    global master_sub_req

    master_sub_req += 1

    app.reqPnLSingle(master_sub_req, "DU7058034", "", contract_id)

    time.sleep(2)
    
    master_sub_req += 1

    app.reqMktData(reqId=master_sub_req, 
                contract=contract,
                genericTickList="",
                snapshot=False,
                regulatorySnapshot=False,
                mktDataOptions=[])

    time.sleep(2)
'''

def websocket_con():
    app.run()

app = TradingApp()

# starting a separate daemon thread to execute the websocket connection
con_thread = threading.Thread(target=websocket_con)

#define threading events to use for app threads
#position_event.set(), .clear(), .wait()

position_event = threading.Event()

pnl_single_event = threading.Event()

contract_details_event = threading.Event()

con_thread.start()

app.connect('127.0.0.1', 9225, clientId = 4)

app.reqPositions()

time.sleep(1)

async def subscribe(contract_id, contract):

    while True:

        global master_sub_req

        master_sub_req += 1

        app.reqPnLSingle(master_sub_req, "DU7058034", "", contract_id)

        time.sleep(2)
    
        master_sub_req += 1

        app.reqMktData(reqId=master_sub_req, 
                contract=contract,
                genericTickList="",
                snapshot=False,
                regulatorySnapshot=False,
                mktDataOptions=[])

        asyncio.sleep(2)

subscribe_thread = threading.Thread(target=subscribe, args =(app,))

subscribe_thread.start()

time.sleep(1)

这里有错误!


Exception in thread Thread-3 (subscribe):
Traceback (most recent call last):
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.3056.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.3056.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
TypeError: subscribe() missing 1 required positional argument: 'contract'
[Finished in 3.1s]
python multithreading asynchronous python-asyncio tws
1个回答
0
投票

您的错误与线程或事件循环无关。问题是错误指出的内容:


TypeError:subscribe() 缺少 1 个必需的位置参数:'contract'

线路有几个问题:

asyncio

    您没有将正确的参数传递给
  1. subscribe_thread = threading.Thread(target=subscribe, args =(app,))

    。根据您对

    subscribe
    的定义,它需要两个参数:
    subscribe
    contract_id
    。你只是通过了它
    contract
    
    

  2. app

    是一个

    协程
    。它不能在 subscribe 事件循环之外直接调用。因此,即使您修复了上面一行中的参数错误,当您尝试启动线程时,您也会收到新的错误,因为您无法直接运行
    asyncio
    
    

总的来说,并不完全清楚你想用
subscribe

来完成什么。据我所知,

TWS API
没有 asyncio 实现或模块。

asyncio

事件循环必须在其自己的线程上运行,并且除了协程和任务之外,不应在那里运行其他代码(在几乎所有情况下)。如果你想在自己的线程上启动事件循环,那么你需要调用像

asyncio
这样的东西作为线程的目标,并将其传递给一个主协程作为参数来运行。在这里查看答案:

    https://stackoverflow.com/a/78256253/17800932
  • https://stackoverflow.com/a/78253105/17800932
© www.soinside.com 2019 - 2024. All rights reserved.