我问这个问题的原因是因为 Bloomberg 通过 BLPAPI 发送数据的方式。继这篇post之后,我想建立一种有效的方法来获取特定字段的值。由于数据发送方式的本质意味着 session.nextEvent() 中可以有多个消息(msg),并且发送了多余的数据,因此数据多于请求的数据,我想知道是否有一种已知的有效方法这样做。到目前为止,我使用的技术和方法意味着,对于 60 种证券和 5 种订阅,数据永远不会实时,因为它滞后,我相信原因是我管理传入数据的方式。下面有一些示例表示一个示例一种证券的下标。鉴于 MKTDATA_EVENT_TYPE 和 MKTDATA_EVENT_SUBTYPE 可能不同,我正在努力寻找一种有效的方法来做到这一点。
我的目标是尽可能避免 for 循环,并选择字典来引导我找到想要的值。
import blpapi
from bloomberg import BloombergSessionHandler
# session = blpapi.Session()
host='localhost'
port=8194
session_options = blpapi.SessionOptions()
session_options.setServerHost(host)
session_options.setServerPort(port)
session_options.setSlowConsumerWarningHiWaterMark(0.05)
session_options.setSlowConsumerWarningLoWaterMark(0.02)
session = blpapi.Session(session_options)
if not session.start():
print("Failed to start Bloomberg session.")
subscriptions = blpapi.SubscriptionList()
fields = ['BID','ASK','TRADE','LAST_PRICE','LAST_TRADE']
subscriptions.add('GB00BLPK7110 @UKRB Corp', fields)
session.subscribe(subscriptions)
session.start()
while(True):
event = session.nextEvent()
print("Event type:",event.eventType())
if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
i = 0
for msg in event:
print("This is msg ", i)
i+=1
print("\n" , "msg is ", msg, "\n")
print(" Message type:",msg.messageType())
eltMsg = msg.asElement();
msgType = eltMsg.getElement('MKTDATA_EVENT_TYPE').getValueAsString();
msgSubType = eltMsg.getElement('MKTDATA_EVENT_SUBTYPE').getValueAsString();
print(" ",msgType,msgSubType)
for fld in fields:
print(" Fields are :", fields)
if eltMsg.hasElement(fld):
print(" ",fld,eltMsg.getElement(fld).getValueAsFloat())
else:
for msg in event:
print(" Message type:",msg.messageType())
我尝试获取我订阅的指定字段的值,但发现我的代码太慢,因此不符合显示实时数据的要求。
def process_subscription_data1(self, session):
while True:
event = session.nextEvent()
print(f"The event is {event}")
if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
print(f"The event type is: {event.eventType()}")
for msg in event:
print(f"The msg is: {msg}")
data = {'instrument': msg.correlationIds()[0].value()}
print(f"The data is: {data}")
# Processing fields efficiently
for field in self.fields:
print("field is ", field, " ", self.fields)
element = msg.getElement(field) if msg.hasElement(field) else None
print("element is ", element)
data[field] = element.getValueAsString() if element and not element.isNull() else 'N/A'
print(f"Emitting data for {data}")
self.data_signal.emit(data) # Emit data immediately for each message
^^ 我尝试过的代码太慢了(即使没有打印语句,它们也只是显示了代码是多么复杂)
“限制”来自 Bloomberg 的实时报价的速率的一种方法是在添加订阅时指定
interval
:
subs = blpapi.SubscriptionList()
flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
tickers = ['RXA Comdty','GB00BLPK7110 @UKRB Corp']
nId = 0
for t in tickers:
subs.add(t,flds,options={'interval':1},correlationId=blpapi.CorrelationId(nId))
nId += 1
session.subscribe(subs)
这将消息间隔限制为 1 秒(在本例中),并且每条消息将包含给定代码的所有数据的“摘要”。每条消息都会更大,因为它将包含所有内容,而不仅仅是您指定的字段。如果某些股票不如其他股票活跃,您可以为每个订阅指定不同的时间间隔。
如果您正在构建一个 GUI,那么这将是事件驱动的并具有消息循环。转向处理 Bloomberg 事件的
asynchronous
/回调方法可能是有意义的。这里的消息是在单独的工作线程中处理的,如果您感兴趣的事件,您只需要提醒 GUI。
import blpapi
import threading
from queue import Queue
def processEvent(evt,session):
et = evt.eventType()
if et == blpapi.Event.SESSION_STATUS:
print('Session Status event')
for msg in evt:
print(' ',msg.messageType())
if msg.messageType() == 'SessionStarted':
sessionReady.set()
return
if et == blpapi.Event.SUBSCRIPTION_STATUS:
print('Subscription Status event')
for msg in evt:
cId = msg.correlationId()
print(' ',msg.messageType(),'for ticker:',tickers[cId.value()])
return
if et == blpapi.Event.SUBSCRIPTION_DATA:
for msg in evt:
cId = msg.correlationId()
tkr = tickers[cId.value()]
eltMsg = msg.asElement()
for f in flds:
if eltMsg.hasElement(f):
v = eltMsg.getElement(f).getValueAsFloat()
tick = (tkr,{f:v})
qTicks.put(tick)
return
qTicks = Queue()
sessionReady = threading.Event()
session = blpapi.Session(eventHandler=processEvent)
print('Waiting for sesssion to start')
session.startAsync()
sessionReady.wait()
print('Session started')
subs = blpapi.SubscriptionList()
flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
tickers = ['EUR Curncy','RXA Comdty','GB00BLPK7110 @UKRB Corp']
nId = 0
for t in tickers:
subs.add(t,flds,correlationId=blpapi.CorrelationId(nId))
nId += 1
session.subscribe(subs)
while True:
try:
tick = qTicks.get(True)
print(tick)
except:
print('Terminating')
break
session.stopAsync()
一个问题是
startAsync
会立即返回,因此当您尝试添加订阅时,您的 Session
可能尚未准备好。解决这个问题的一种方法是等待 SessionStarted
消息并发出 python 事件信号。在开始使用会议之前,请先了解一下此活动。我不知道 GUI 可能有什么样的消息循环。在此示例中,我使用 python wait
将刻度数据从工作线程发送到主线程(Ctrl-C 将生成异常并结束循环)。另一种选择是将消息“发布”到主线程的消息队列。
Queue
回调可用于决定何时向主线程触发事件:例如,彭博消息包含每个字段的(命名不一致的)最后更新时间,这些可用于确定消息的数据自此以来是否已更改最后一个刻度:您只会发出新数据刻度的信号,从而减少 GUI 中的屏幕更新量。