我正在使用 BLPAPI 运行订阅会话,并且能够获取实时数据,但我想从(广泛的)字段列表中分离出特定值。然后我想将其放入带有“Ticker”和“Field”的数据框中,例如
股票代码 | 最后价格 |
---|---|
ESM3指数 | 4138.25 |
import blpapi
session = blpapi.Session()
session.start()
subscriptions = blpapi.SubscriptionList()
subscriptions.add("ESM3 Index", "LAST_PRICE", " ",)
session.subscribe(subscriptions)
while(True):
event = session.nextEvent()
for msg in event:
print(msg)
其结果是:
CID: {[ valueType=AUTOGEN classId=0 value=1 ]}
MarketDataEvents = {
MKTDATA_EVENT_TYPE = TRADE
MKTDATA_EVENT_SUBTYPE = NEW
EVT_TRADE_DATE_RT = 2023-04-07
NUM_TRADES_RT = 21535
PER_TRADE_VWAP_VOLUME_RT = 59291.000000
PER_TRADE_VWAP_TURNOVER_RT = 244947593.500000
PER_TRADE_VWAP_REALTIME = 4131.277800
LAST_ALL_SESSIONS = 4138.250000
LAST2_TRADE = 4138.250000
LAST_PRICE = 4138.250000
LAST_PRICE_TDY = 4138.250000
LAST2_DIR = -1
LAST_TICK_DIRECTION_RT = 2
LAST_TRADE = 4138.250000
SIZE_LAST_TRADE = 2
SIZE_LAST_TRADE_TDY = 2
TRADE_SIZE_ALL_SESSIONS_RT = 2
ALL_PRICE_SIZE = 2
ALL_PRICE_COND_CODE = "TSUM"
ALL_PRICE = 4138.250000
VOLUME = 59598
VOLUME_TDY = 59598
REALTIME_VOLUME_5_DAY_INTERVAL =
DELTA_AVAT_1_DAY_INTERVAL =
DELTA_AVAT_5_DAY_INTERVAL =
DELTA_AVAT_10_DAY_INTERVAL =
DELTA_AVAT_20_DAY_INTERVAL =
DELTA_AVAT_30_DAY_INTERVAL =
DELTA_AVAT_100_DAY_INTERVAL =
DELTA_AVAT_180_DAY_INTERVAL =
LAST_PRICE_COND_CODE_RT = "TSUM"
PRICE_CHANGE_1Y_NET_RT = -402.000000
PRICE_CHANGE_1Y_PCT_RT = -8.854100
LAST_CONTINUOUS_TRADE_PRICE_RT = 4138.250000
PRICE_LAST_RT = 4138.250000
LAST_TRADE_PRICE_TODAY_RT = 4138.250000
EVT_TRADE_PRICE_RT = 4138.250000
EVT_TRADE_SIZE_RT = 2
EVT_TRADE_CONDITION_CODE_RT = "TSUM"
我希望能够提取特定的股票代码,例如“LAST_PRICE”
我尝试过使用 msg.getElement("LAST_PRICE") 但它不喜欢这样,因为它说找不到子元素。
您没有按类型过滤事件。彭博会议将发送许多不同类型的事件。
在OP的代码中,当会话打开时,前两个事件将是SESSION_STATUS(=2),然后当服务打开时,将是SERVICE_STATUS(=9)事件。之后,您会收到 SUBSCRIPTION_STATUS(=3) 事件。最后,您开始接收类型为 SUBSCRIPTION_DATA(=8) 的报价数据:这些是您关心并想要处理的数据。
即使您获得市场数据事件,它们也具有不同的类型和子类型,并且每个事件可能有也可能没有您正在寻找的数据项。例如,BID 子类型可能没有 ASK 字段。
这是一个简短的测试应用程序,它允许您询问每个刻度事件,并决定如何处理它:
import blpapi
session = blpapi.Session()
session.start()
subscriptions = blpapi.SubscriptionList()
fields = ['BID','ASK','TRADE','LAST_PRICE','LAST_TRADE','EVT_TRADE_PRICE_RT','EVT_TRADE_SIZE_RT']
subscriptions.add('SFRM3 Index', fields)
session.subscribe(subscriptions)
while(True):
event = session.nextEvent()
print("Event type:",event.eventType())
if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
for msg in event:
print(" Message type:",msg.messageType())
eltMsg = msg.asElement();
msgType = eltMsg.getElement('MKTDATA_EVENT_TYPE').getValueAsString();
msgSubType = eltMsg.getElement('MKTDATA_EVENT_SUBTYPE').getValueAsString();
print(" ",msgType,msgSubType)
for fld in fields:
if eltMsg.hasElement(fld):
print(" ",fld,eltMsg.getElement(fld).getValueAsFloat())
else:
for msg in event:
print(" Message type:",msg.messageType())
您可能会看到类似这样的输出:
Event type: 2
Message type: SessionConnectionUp
Event type: 2
Message type: SessionStarted
Event type: 9
Message type: ServiceOpened
Event type: 3
Message type: SubscriptionStarted
Message type: SubscriptionStreamsActivated
Event type: 8
Message type: MarketDataEvents
SUMMARY INITPAINT
BID 95.06
ASK 95.065
Event type: 8
Message type: MarketDataEvents
SUMMARY INITPAINT
BID 95.06
ASK 95.065
LAST_PRICE 95.06
LAST_TRADE 95.06
Event type: 8
Message type: MarketDataEvents
TRADE NEW
EVT_TRADE_PRICE_RT 95.06
EVT_TRADE_SIZE_RT 2.0
Message type: MarketDataEvents
TRADE NEW
EVT_TRADE_PRICE_RT 95.06
EVT_TRADE_SIZE_RT 3.0
Message type: MarketDataEvents
TRADE NEW
EVT_TRADE_PRICE_RT 95.06
EVT_TRADE_SIZE_RT 1.0
Event type: 8
Message type: MarketDataEvents
TRADE NEW
EVT_TRADE_PRICE_RT 95.06
EVT_TRADE_SIZE_RT 2.0
Event type: 8
Message type: MarketDataEvents
QUOTE ASK
ASK 95.065
Event type: 8
Message type: MarketDataEvents
QUOTE ASK
ASK 95.065
需要注意的一件重要事情是,单个事件可能有多个消息,并且某些消息(如 INITPAINT)可能会重复。
扩展类似于 @DS_London 的代码,但将您对数据放入数据框中的需求纳入其中。尽管不是我对这篇post的需求所强调的最有效的代码,但我认为它仍然是有价值的,并且会在我改进它时进行更新。
import blpapi
import pandas as pd
import numpy as np
import os
# initialise bloomberg
host='localhost'
port=8194
session_options = blpapi.SessionOptions()
session_options.setServerHost(host)
session_options.setServerPort(port)
session_options.setSlowConsumerWarningHiWaterMark(0.05)
session_options.setSlowConsumerWarningLoWaterMark(0.02)
session = blpapi.Session(session_options)
if not session.start():
print("Failed to start Bloomberg session.")
session.start()
subscriptions = blpapi.SubscriptionList()
# sample selection of securities and fields
fields = ['BID','ASK','TRADE','LAST_PRICE']
subscriptions.add('VOD LN Equity', fields, "", blpapi.CorrelationId('VOD LN Equity'))
subscriptions.add('AZN LN Equity', fields, "", blpapi.CorrelationId('AZN LN Equity'))
session.subscribe(subscriptions)
ColPosDict = {'BID':0,
'ASK':1,
'TRADE':2,
'LAST_PRICE':3}
RowPosDict = {'VOD LN Equity':0,
'AZN LN Equity':1}
df_AllData = pd.DataFrame(np.nan, index=list(['VOD LN Equity' ,'AZN LN Equity']), columns= fields)
while(True):
event = session.nextEvent()
if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
for msg in event:
security = msg.correlationIds()[0].value()
eltMsg = msg.asElement()
for fld in fields:
if eltMsg.hasElement(fld):
df_AllData.iat[RowPosDict[security], ColPosDict[fld]] = eltMsg.getElement(fld).getValueAsFloat()
os.system('cls' if os.name == 'nt' else 'clear')
print(df_AllData)
输出示例:
BID ASK TRADE LAST_PRICE
VOD LN Equity 69.72 69.76 NaN 69.72
AZN LN Equity 11200.00 11204.00 NaN 11200.00