Reputation: 79
All,
I have been using blpapi with Python to query reference data (//blp/refdata) without issues and I've recently been using the streaming endpoint (//blp/mkdata); so my environment is set up (yes you need the C++ libraries/dlls - you have to follow the instrux in the bloomberg portal) and I'm getting data.
My question relates to writing performant code. The //blp/mkdata service will send you a bunch of data fast and if you don't manage it correctly you could end up with code that doesn't respond. Plus you don't want to bog down your machine. That is the main focus with this question. If anyone has had experience with streaming data via the //blp/mkdata service along low 100s tickers and 10-30 fields then I would appreciate your suggestions.
Let me make the question a little clearer. Imagine that ultimately you want the code to do three things (perhaps 3 threads?) as follows:
I used the "demo tool" (BloombergWindowsSDK_3.xx\Demo Tool) to download prices for "T US". There is a button there ({}) that will produce the code for you to use; it works. My concern is how well this code will grow with 100 tickers and say 30 fields?
There are also many examples in the folder BloombergWindowsSDK_3.xx\Python\v3.14.0\examples and although they have some comments I think that it is hard to tell what the uses of each are. It is hard to tell where to start. Below I'm pasting the code generated by the demo tool. I don't think that it uses threads so not sure that it would be wise to expand on it.
Anyway, any suggestions are greatly appreciated, i.e. do you know of a way to make the demo tool code threaded? Or do you have a code example (from the examples folder even) that can stream using //blp/refdata service and run other threads and is relatively performant?
p.s. I saw many posts where people have issues setting up/getting started with blpapi/Bloomberg. Message me and I'll see what I can do to help. I'm over that hump but now want to build a more robust and fast set of functionality.
# SubscriptionWithEventHandlerExample.py
import blpapi
import blpapi.logging
from optparse import OptionParser
import time
EXCEPTIONS = blpapi.Name("exceptions")
FIELD_ID = blpapi.Name("fieldId")
REASON = blpapi.Name("reason")
CATEGORY = blpapi.Name("category")
DESCRIPTION = blpapi.Name("description")
SessionConnectionDown = blpapi.Name("SessionConnectionDown")
SessionConnectionUp = blpapi.Name("SessionConnectionUp")
SessionTerminated = blpapi.Name("SessionTerminated")
ServiceDown = blpapi.Name("ServiceDown")
SlowConsumerWarning = blpapi.Name("SlowConsumerWarning")
SlowConsumerWarningCleared = blpapi.Name("SlowConsumerWarningCleared")
DataLoss = blpapi.Name("DataLoss")
ServiceName = blpapi.Name("serviceName")
# authorization
AUTHORIZATION_SUCCESS = blpapi.Name("AuthorizationSuccess")
AUTHORIZATION_FAILURE = blpapi.Name("AuthorizationFailure")
AUTHORIZATION_REVOKED = blpapi.Name("AuthorizationRevoked")
TOKEN_SUCCESS = blpapi.Name("TokenGenerationSuccess")
TOKEN_FAILURE = blpapi.Name("TokenGenerationFailure")
TOKEN = blpapi.Name("token")
g_session = None
g_sessionStarted = False
g_subscriptions = None
g_identity = None
g_authCorrelationId = None
class CorrelationInfo:
def __init__(self, topic):
self.topic = topic
def getTopic(self):
return self.topic
class SubscriptionEventHandler(object):
def getTimeStamp(self):
return time.strftime("%Y/%m/%d %X")
#
# Process Session Status
#
# session connected message
#SessionConnectionUp = {
# server = "127.0.0.1:8194"
#}
# session started message
#SessionStarted = {}
# session disconnected message
#SessionConnectionDown = {
# server = "127.0.0.1:8194"
#}
# session terminated message
#SessionTerminated = {
# reason = {
# }
#}
def processSessionStatus(self, event):
timeStamp = self.getTimeStamp()
print("Processing SESSION_STATUS")
for msg in event:
if msg.messageType() == SessionConnectionDown:
# API connection disconnect (Session still active)
print("%s: Session connection down detected: %s" % (timeStamp, msg))
elif msg.messageType() == SessionConnectionUp:
# API connection up (Session still active)
print("%s: Session connection up detected: %s" % (timeStamp, msg))
elif msg.messageType() == SessionTerminated:
# Session no longer active
print("%s: Session terminated!: %s" % (timeStamp, msg))
g_sessionStarted = False
else:
# misc messages
print("%s: Misc session message: %s" % (timeStamp, msg))
def processServiceStatus(self, event):
timeStamp = self.getTimeStamp()
print("Processing SESSION_STATUS")
for msg in event:
if msg.messageType() == ServiceDown:
if msg.hasElement(ServiceName, True):
if msg.getElement(ServiceName).getValueAsString() == "//blp/mktdata":
# API connection disconnect (Session still active)
print("%s: //blp/mktdata service down detected: %s" % (timeStamp, msg))
else:
# misc messages
print("%s: Misc session message: %s" % (timeStamp, msg))
def processSubscriptionStatus(self, event):
timeStamp = self.getTimeStamp()
print ("Processing SUBSCRIPTION_STATUS")
for msg in event:
cInfo = msg.correlationIds()[0].value()
print ("%s: %s - %s" % (timeStamp, cInfo.getTopic(), msg.messageType()))
print (msg)
if msg.hasElement(REASON):
# This can occur on SubscriptionFailure.
reason = msg.getElement(REASON)
if msg.hasElement(CATEGORY) and msg.hasElement(DESCRIPTION):
print (" %s: %s" % (
reason.getElement(CATEGORY).getValueAsString(),
reason.getElement(DESCRIPTION).getValueAsString()))
if msg.hasElement(EXCEPTIONS):
# This can occur on SubscriptionStarted if at least
# one field is good while the rest are bad.
exceptions = msg.getElement(EXCEPTIONS)
for exInfo in exceptions.values():
fieldId = exInfo.getElement(FIELD_ID)
reason = exInfo.getElement(REASON)
print (" %s: %s" % (
fieldId.getValueAsString(),
reason.getElement(CATEGORY).getValueAsString()))
def processSubscriptionDataEvent(self, event):
timeStamp = self.getTimeStamp()
print ()
print ("Processing SUBSCRIPTION_DATA")
for msg in event:
cInfo = msg.correlationIds()[0].value()
print ("%s: %s - %s" % (timeStamp, cInfo.getTopic(), msg.messageType()))
for field in msg.asElement().elements():
if field.numValues() < 1:
print (" %s is NULL" % field.name())
continue
# Assume all values are scalar.
print( " %s = %s" % (field.name(),
field.getValueAsString()))
def processAdmin(self, event):
timeStamp = self.getTimeStamp()
for msg in event:
if msg.messageType() == SlowConsumerWarning:
print ("!!!! Slow consumer warning !!!! ")
elif msg.messageType() == SlowConsumerWarningCleared:
print ("!!!! Slow consumer warning cleared !!!! ")
else:
print ("%s: %s\n%s" % (timeStamp, msg.messageType(), msg))
def processMiscEvents(self, event):
timeStamp = self.getTimeStamp()
for msg in event:
print ("%s: %s\n%s" % (timeStamp, msg.messageType(), msg))
def processEvent(self, event, session):
try:
if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
return self.processSubscriptionDataEvent(event)
elif event.eventType() == blpapi.Event.SUBSCRIPTION_STATUS:
return self.processSubscriptionStatus(event)
elif event.eventType() == blpapi.Event.SESSION_STATUS:
return self.processSessionStatus(event)
elif event.eventType() == blpapi.Event.SERVICE_STATUS:
return self.processServiceStatus(event)
elif event.eventType() == blpapi.Event.ADMIN:
return self.processAdmin(event)
else:
return self.processMiscEvents(event)
except blpapi.Exception as e:
print ("Library Exception !!! %s" % e.description())
return False
def authorize(p_cid):
is_authorized = None
WAIT_TIME_SECONDS = 10
authService = g_session.getService("//blp/apiauth")
# generate token
identity = g_session.createIdentity()
tokenEventQueue = blpapi.EventQueue()
g_session.generateToken(eventQueue=tokenEventQueue)
# Process token response
ev = tokenEventQueue.nextEvent(WAIT_TIME_SECONDS * 1000)
token = None
if ev.eventType() == blpapi.Event.TOKEN_STATUS:
for msg in ev:
print (msg)
if msg.messageType() == TOKEN_SUCCESS:
token = msg.getElementAsString(TOKEN)
elif msg.messageType() == TOKEN_FAILURE:
break
elif ev.eventType() == blpapi.Event.REQUEST_STATUS:
# request failure
for msg in ev:
print (msg)
elif ev.eventType() == blpapi.Event.TIMEOUT:
print ("Generate token response did not return within the timeout given timeout period")
if not token:
print ("Failed to get token")
else:
# Create and fill the authorithation request
authRequest = authService.createAuthorizationRequest()
authRequest.set(TOKEN, token)
eventQueue = blpapi.EventQueue()
# Send authorithation request to "fill" the Identity
identity = g_session.createIdentity()
g_session.sendAuthorizationRequest(authRequest, identity, p_cid, eventQueue)
# Process related responses
event = eventQueue.nextEvent(WAIT_TIME_SECONDS * 1000)
if event.eventType() == blpapi.Event.RESPONSE or \
event.eventType() == blpapi.Event.PARTIAL_RESPONSE:
if event.eventType() == blpapi.Event.PARTIAL_RESPONSE:
print ("Warning: Received authorization partial response. The authorization requestion should be sent asynchronously")
for msg in event:
print (msg)
if msg.messageType() == AUTHORIZATION_SUCCESS:
is_authorized = identity
else:
print ("Authorization failed")
elif ev.eventType() == blpapi.Event.REQUEST_STATUS:
# request failure
for msg in ev:
print (msg)
elif ev.eventType() == blpapi.Event.TIMEOUT:
print ("Generate token response did not return within the timeout given timeout period")
g_session.cancel(p_cid)
return is_authorized
def subscribe(p_identity):
subscriptionOptions = []
fieldList = []
fieldList.append("VOLUME")
fieldList.append("LAST_PRICE")
subscriptionOptions1 = []
cInfo = CorrelationInfo("T US EQUITY")
cId = blpapi.CorrelationId(cInfo)
g_subscriptions.add(cInfo.getTopic(), fieldList, subscriptionOptions1, cId)
g_session.subscribe(g_subscriptions, p_identity)
# callback for BLPAPI logging
def onMessage(threadId, traceLevel, dateTime, loggerName, message):
print("%s %s [%s] Thread ID = %s %s" %
(dateTime, loggerName, traceLevel, threadId, message))
def main():
global g_session, g_sessionStarted, g_subscriptions , g_identity, g_authCorrelationId
# set BLPAPI log level
blpapi.logging.Logger.registerCallback(onMessage, blpapi.logging.Logger.SEVERITY_OFF)
# Fill SessionOptions
options = blpapi.SessionOptions()
g_subscriptions = blpapi.SubscriptionList()
options.setServerHost("your ip here")
options.setServerPort(port_number)
options.setAuthenticationOptions("AuthenticationMode=APPLICATION_ONLY;ApplicationAuthenticationType=APPNAME_AND_KEY;ApplicationName=LIGHT_STREET:lscode")
print("Session options: %s" % options)
eventHandler = SubscriptionEventHandler()
# Create a Session
g_session = blpapi.Session(options, eventHandler.processEvent)
# Start a Session
if not g_session.start():
print ("Failed to start session.")
return
print ("Connected successfully")
g_sessionStarted = True
service = "//blp/mktdata"
if not g_session.openService(service):
print ("Failed to open %s service" % service)
return
# Open authorization service
if not g_session.openService("//blp/apiauth"):
print("Failed to open //blp/apiauth")
return
# Open authorization service
if not g_session.openService("//blp/apiauth"):
print("Failed to open //blp/apiauth")
return
# Authorize user that is interested in receiving data
g_authCorrelationId = blpapi.CorrelationId("authCorrelation");
g_identity = authorize(g_authCorrelationId)
if g_identity is None:
print ("Exiting...")
return
print ("Subscribing...")
subscribe(g_identity)
try:
# Wait for enter key to exit application
print ("Press ENTER to quit")
# python 2.x
#raw_input()
# python 3.x
input()
finally:
if g_sessionStarted:
g_session.unsubscribe(g_subscriptions)
# Stop the session
g_session.stop()
time.sleep(1)
if __name__ == "__main__":
print ("SubscriptionWithEventHandlerExample")
try:
main()
except KeyboardInterrupt:
print ("Ctrl+C pressed. Stopping...")
__copyright__ = """
Copyright 2018. Bloomberg Finance L.P.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to
deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: The above
copyright notice and this permission notice shall be included in all copies
or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
"""
Upvotes: 1
Views: 413
Reputation: 356
Does your use case require every tick ? If not, then one possibility is to go intervalised/conflated with the subscription option of "?interval=N.N" (eg 0.5) which means you'll get an event if any of the specified fields change value within (example) half a second (or whatever period you specify). In the Subscription code you would set SubscriptionOptions to set this. For any subscription you have tickers... fields... and options.
The documentation has plenty of examples of using "interval=" as an option. Also be conscious that you will have to process MKTDATA_EVENT_SUBTYPE = "INTERVAL", and you will not get TRADE or QUOTE events.
Alternatively, if you're going full tick, then focus on a 'queue' pattern, whereby you pull the events and place them on an internal queue, and then have another function that is pulling them off to the queue as and when. This should smooth out the load.
Upvotes: 0