使用 BLPAPI 检索指定字段的值的快速/高效方法

问题描述 投票:0回答:1

我问这个问题的原因是因为 Bloomberg 通过 BLPAPI 发送数据的方式。继这篇post之后,我想建立一种有效的方法来获取特定字段的值。由于数据发送方式的本质意味着 session.nextEvent() 中可以有多个消息(msg),并且发送了多余的数据,因此数据多于请求的数据,我想知道是否有一种已知的有效方法这样做。到目前为止,我使用的技术和方法意味着,对于 60 种证券和 5 种订阅,数据永远不会实时,因为它滞后,我相信原因是我管理传入数据的方式。下面有一些示例表示一个示例一种证券的下标。鉴于 MKTDATA_EVENT_TYPE 和 MKTDATA_EVENT_SUBTYPE 可能不同,我正在努力寻找一种有效的方法来做到这一点。

我的目标是尽可能避免 for 循环,并选择字典来引导我找到想要的值。

import blpapi
from bloomberg import BloombergSessionHandler

# session = blpapi.Session()

host='localhost'
port=8194

session_options = blpapi.SessionOptions()
session_options.setServerHost(host)
session_options.setServerPort(port)
session_options.setSlowConsumerWarningHiWaterMark(0.05)
session_options.setSlowConsumerWarningLoWaterMark(0.02)

session = blpapi.Session(session_options)
if not session.start():
    print("Failed to start Bloomberg session.")


subscriptions = blpapi.SubscriptionList()
fields = ['BID','ASK','TRADE','LAST_PRICE','LAST_TRADE']
subscriptions.add('GB00BLPK7110 @UKRB Corp', fields)
session.subscribe(subscriptions)
session.start()


while(True):
    event = session.nextEvent()
    print("Event type:",event.eventType())

    if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA: 
        i = 0
        for msg in event:
            print("This is msg ", i)
            i+=1
            print("\n" , "msg is ", msg, "\n")
            print("  Message type:",msg.messageType())
            eltMsg = msg.asElement();
            msgType = eltMsg.getElement('MKTDATA_EVENT_TYPE').getValueAsString();
            msgSubType = eltMsg.getElement('MKTDATA_EVENT_SUBTYPE').getValueAsString();
            print(" ",msgType,msgSubType)

            for fld in fields:
                print(" Fields are :",  fields)
                if eltMsg.hasElement(fld):
                    print("    ",fld,eltMsg.getElement(fld).getValueAsFloat())
    else:
        for msg in event:
            print("  Message type:",msg.messageType())

我尝试获取我订阅的指定字段的值,但发现我的代码太慢,因此不符合显示实时数据的要求。

    def process_subscription_data1(self, session):
        while True:
            event = session.nextEvent()
            print(f"The event is {event}")
            if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
                print(f"The event type is: {event.eventType()}")
                for msg in event:
                    print(f"The msg is: {msg}")
                    data = {'instrument': msg.correlationIds()[0].value()}
                    print(f"The data is: {data}")
                    # Processing fields efficiently
                    for field in self.fields:
                        print("field is ", field, " ", self.fields)
                        element = msg.getElement(field) if msg.hasElement(field) else None
                        print("element is ", element)
                        data[field] = element.getValueAsString() if element and not element.isNull() else 'N/A'
                    print(f"Emitting data for {data}")
                    self.data_signal.emit(data)  # Emit data immediately for each message

^^ 我尝试过的代码太慢了(即使没有打印语句,它们也只是显示了代码是多么复杂)

python bloomberg blpapi
1个回答
0
投票

“限制”来自 Bloomberg 的实时报价的速率的一种方法是在添加订阅时指定

interval

subs = blpapi.SubscriptionList()
flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
tickers = ['RXA Comdty','GB00BLPK7110 @UKRB Corp']

nId = 0
for t in tickers:
    subs.add(t,flds,options={'interval':1},correlationId=blpapi.CorrelationId(nId))
    nId += 1

session.subscribe(subs)

这将消息间隔限制为 1 秒(在本例中),并且每条消息将包含给定代码的所有数据的“摘要”。每条消息都会更大,因为它将包含所有内容,而不仅仅是您指定的字段。如果某些股票不如其他股票活跃,您可以为每个订阅指定不同的时间间隔。

如果您正在构建一个 GUI,那么这将是事件驱动的并具有消息循环。转向处理 Bloomberg 事件的

asynchronous
/回调方法可能是有意义的。这里的消息是在单独的工作线程中处理的,如果您感兴趣的事件,您只需要提醒 GUI。

import blpapi
import threading
from queue import Queue

def processEvent(evt,session):
    et = evt.eventType()

    if et == blpapi.Event.SESSION_STATUS:
        print('Session Status event')
        for msg in evt:
            print('   ',msg.messageType())
            if msg.messageType() == 'SessionStarted':
                sessionReady.set()
        return
    if et == blpapi.Event.SUBSCRIPTION_STATUS:
        print('Subscription Status event')
        for msg in evt:
            cId = msg.correlationId()
            print('   ',msg.messageType(),'for ticker:',tickers[cId.value()])

        return
    if et == blpapi.Event.SUBSCRIPTION_DATA:
        for msg in evt:
            cId = msg.correlationId()
            tkr = tickers[cId.value()]
            eltMsg = msg.asElement()

            for f in flds:
                if eltMsg.hasElement(f):
                    v = eltMsg.getElement(f).getValueAsFloat()
                    tick = (tkr,{f:v})
                    qTicks.put(tick)
  
        return

qTicks = Queue()
sessionReady = threading.Event() 

session = blpapi.Session(eventHandler=processEvent)

print('Waiting for sesssion to start')
session.startAsync()
sessionReady.wait()
print('Session started')

subs = blpapi.SubscriptionList()
flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
tickers = ['EUR Curncy','RXA Comdty','GB00BLPK7110 @UKRB Corp']

nId = 0
for t in tickers:
    subs.add(t,flds,correlationId=blpapi.CorrelationId(nId))
    nId += 1

session.subscribe(subs)

while True:
    try:
        tick = qTicks.get(True)
        print(tick)
    except:
        print('Terminating')
        break

session.stopAsync()

一个问题是

startAsync
会立即返回,因此当您尝试添加订阅时,您的
Session
可能尚未准备好。解决这个问题的一种方法是等待
SessionStarted
消息并发出 python 事件信号。在开始使用会议之前,请先了解一下此活动。
我不知道 GUI 可能有什么样的消息循环。在此示例中,我使用 python 

wait

将刻度数据从工作线程发送到主线程(Ctrl-C 将生成异常并结束循环)。另一种选择是将消息“发布”到主线程的消息队列。

Queue
回调可用于决定何时向主线程触发事件:例如,彭博消息包含每个字段的(命名不一致的)最后更新时间,这些可用于确定消息的数据自此以来是否已更改最后一个刻度:您只会发出新数据刻度的信号,从而减少 GUI 中的屏幕更新量。
    

© www.soinside.com 2019 - 2024. All rights reserved.