Reputation: 65
I`ve got some bottleneck with data, and will be appreciated for senior advice.
I have an API, where i recieve financial data that looks like this GBPUSD
2020-01-01 00:00:01.001
1.30256
1.30250
, my target is to write those data directly into databse as fast as it possible.
Inputs:
Incoming data structure, as showed above, comes in one dictionary {symbol: {datetime: (price1, price2)}}
. All of the data comes in String
datatype.
API is streaming 29 symbols, so I can recieve for example from 30 to 60+ values of different symbols just in one second.
How it works now:
data_dict
; data_dict[symbol][last_value].enqueue(save_record, args=(datetime, price1, price2))
. Till this point everything works fine and fast."
def save_record(Datetime, price1, price2, Instr, adf):
# Parameters
#----------
# Datetime : 'string' : Datetime value
# price1 : 'string' : Bid Value
# price2 : 'string' : Ask Value
# Instr : 'string' : symbol to save
# adf : 'string' : Cred to DataBase engine
#-------
# result : : Execute save command to database
engine = create_engine(adf)
meta = MetaData(bind=engine,reflect=True)
table_obj = Table(Instr,meta)
insert_state = table_obj.insert().values(Datetime=Datetime,price1=price1,price2=price2)
with engine.connect() as conn:
conn.execute(insert_state)
When i`m execute last row of function, it takes from 0.5 to 1 second to write those row into the database:
12:49:23 default: DT.save_record('2020-00-00 00:00:01.414538', 1.33085, 1.33107, 'USDCAD', 'postgresql cred') (job_id_1)
12:49:24 default: Job OK (job_id_1)
12:49:24 default: DT.save_record('2020-00-00 00:00:01.422541', 1.56182, 1.56213, 'EURCAD', 'postgresql cred') (job_id_2)
12:49:25 default: Job OK (job_id_2)
Queued jobs for inserting each row directly into database is that bottleneck, because I can insert only 1 - 2 value(s) in 1 second, and I can recieve over 60 values in 1 second. If I run this saving, it starts to create huge queue (maximum i get was 17.000 records in queue after 1 hour of API listening), and it won't stop rhose size.
I'm currently using only 1 queue, and 17 workers. This make my PC CPU run in 100%.
So question is how to optimize this process and not create huge queue. Maybe try to save for example in JSON some sequence and then insert into DB, or store incoming data in separated variables..
Sorry if something is doubted, ask - and I`ll answer.
--UPD-- So heres my little review about some experiments:
engine
meta
out of functionDue to my architechture, API application located on Windows 10, and Redis Queue located on Linux. There was an issue wis moving meta
and engine
out of function, it returns TypeError (it is not depends on OS), a little info about it here
data_dict = {'data_pack': []}
, to begin storing there incoming values. Then I ask if there is more than 20 values per symbol is written allready - i'm sending those branch to Redis Queue, and it takes 1.5 second to write down in database. Then i delete taken records from data_dict
, and process continue. So thanks Mike Organek for good advice.Those approach is quite enough for my targets to exist, at the same time I can say that this stack of tech can provide you really good flexibility!
Upvotes: 1
Views: 249
Reputation: 123664
Every time you call save_record
you re-create the engine
and (reflected) meta
objects, both of which are expensive operations. Running your sample code as-is gave me a throughput of
20 rows inserted in 4.9 seconds
Simply moving the engine =
and meta =
statements outside of the save_record
function (and thereby only calling them once) improved throughput to
20 rows inserted in 0.3 seconds
Additional note: It appears that you are storing the values for each symbol in a separate table, i.e. 'GBPUSD' data in a table named GBPUSD
, 'EURCAD' data in a table named EURCAD
, etc.. That is a "red flag" suggesting bad database design. You should be storing all of the data in a single table with a column for the symbol.
Upvotes: 1