Reputation: 14317
I built a data loader prototype that saves CSV into splayed tables. The workflow is as follows:
Create schema the first time e.g. volatilitysurface
table:
volatilitysurface::([date:`datetime$(); ccypair:`symbol$()] atm_convention:`symbol$(); premium_included:`boolean$(); smile_type:`symbol$(); vs_type:`symbol$(); delta_ratio:`float$(); delta_setting:`float$(); wing_extrapolation:`float$(); spread_type:`symbol$());
For every file in the rawdata folder import it:
myfiles:@[system;"dir /b /o:gn ",string `$getenv[`KDBRAWDATA],"*.volatilitysurface.csv 2> nul";()];
if[myfiles~();.lg.o[`load;"no volatilitysurface files found!"];:0N];
.lg.o[`load;"loading data files ..."];
/ load each file
{
mypath:"" sv (string `$getenv[`KDBRAWDATA];x);
.lg.o[`load;"loading file name '",mypath,"' ..."];
myfile:hsym`$mypath;
tmp1:select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from ("ZSSSSSFFFS";enlist ",")0:myfile;
`volatilitysurface upsert tmp1;
} @/: myfiles;
delete tmp1 from `.;
.Q.gc[];
.lg.o[`done;"loading volatilitysurface data done"];
.lg.o[`save;"saving volatilitysurface schema to ",string afolder];
volatilitysurface::0!volatilitysurface;
.Q.dpft[afolder;`;`ccypair;`volatilitysurface];
.lg.o[`cleanup;"removing volatilitysurface from memory"];
delete volatilitysurface from `.;
.Q.gc[];
.lg.o[`done;"saving volatilitysurface schema done"];
This works perfectly. I use .Q.gc[];
frequently to avoid hitting the wsfull
. When new CSV files are available I open the existing schema, upsert into it and save it again effectively overwriting the existing HDB file system.
Open schema:
.lg.o[`open;"tables already exists, opening the schema ..."];
@[system;"l ",(string afolder) _ 0;{.lg.e[`open;"failed to load hdb directory: ", x]; 'x}];
/ Re-create table index
volatilitysurface::`date`ccypair xkey select from volatilitysurface;
Re-run step #2 to append new CSV files into the existing volatilitysurface
table, it upserts the first CSV perfectly but the second CSV fails with:
error: `cast
I debug to the point of the error and to double-check I see that the metadata of tmp1
and volatilitysurface
are perfectly the same. Any ideas why this is happening? I get the same issue with any other table. I have tried cleaning the keys from the table after every upsert but doesn't help i.e.
volatilitysurface::0!volatilitysurface;
volatilitysurface::`date`ccypair xkey volatilitysurface;
And the metadata comparison at the point of the cast error:
meta tmp1
c | t f a
------------------| -----
date | z
ccypair | s
atm_convention | s
premium_included | b
smile_type | s
vs_type | s
delta_ratio | f
delta_setting | f
wing_extrapolation| f
spread_type | s
meta volatilitysurface
c | t f a
------------------| -----
date | z
ccypair | s p
atm_convention | s
premium_included | b
smile_type | s
vs_type | s
delta_ratio | f
delta_setting | f
wing_extrapolation| f
spread_type | s
UPDATE Using the input of the answer below I tried using Torq's .loader.loadallfiles
function like this (it doesn't fail but nothing happens either, the table is not created in memory and the data is not written to the database):
.loader.loadallfiles[`headers`types`separator`tablename`dbdir`dataprocessfunc!(`x`ccypair`atm_convention`premium_included`smile_type`vs_type`delta_ratio`delta_setting`wing_extrapolation`spread_type;"ZSSSSSFFFS";enlist ",";`volatilitysurface;`:hdb; {[p;t] select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from t}); `:rawdata]
UDPATE2 This is the output I get from TorQ:
2017.11.20D08:46:12.550618000|wsp18497wn|dataloader|dataloader1|INF|dataloader|**** LOADING :rawdata/20171102_113420.disccurve.csv ****
2017.11.20D08:46:12.550618000|wsp18497wn|dataloader|dataloader1|INF|dataloader|reading in data chunk
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|Read 10000 rows
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|processing data
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|Enumerating
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 4525 rows to :hdb/2017.09.12/volatilitysurface/
2017.11.20D08:46:12.581819000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 4744 rows to :hdb/2017.09.13/volatilitysurface/
2017.11.20D08:46:12.659823000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 731 rows to :hdb/2017.09.14/volatilitysurface/
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|init|retrieving sort settings from :C:/Dev/torq//config/sort.csv
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sort|sorting the volatilitysurface table
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sorttab|No sort parameters have been specified for : volatilitysurface. Using default parameters
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sortfunction|sorting :hdb/2017.09.05/volatilitysurface/ by these columns : sym, time
2017.11.20D08:46:12.753428000|wsp18497wn|dataloader|dataloader1|ERR|sortfunction|failed to sort :hdb/2017.09.05/volatilitysurface/ by these columns : sym, time. The error was: hdb/2017.09.
I get the following error sorttab|No sort parameters have been specified for : volatilitysurface. Using default parameters
where is this sorttab documented? does it use the table PK by default?
UPDATE3 Ok fixed UPDATE2 out by providing a non-default sort.csv
under my config
folder:
tabname,att,column,sort
default,p,sym,1
default,,time,1
volatilitysurface,,date,1
volatilitysurface,,ccypair,1
But now I see that if I call the function multiple times on the same files, it simply appends duplicated data instead of upsert
ing it.
UPDATE4 Still not there yet ... assuming I can check to make sure that no duplicate file is used. When I load and then start the database I get some structure back that ressembles some sort of dictionary and not a table.
2017.10.31| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.01| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.02| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.03| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
sym | `AUDNOK`AUDCNH`AUDJPY`AUDHKD`AUDCHF`AUDSGD`AUDCAD`AUDDKK`CADSGD`C..
Note that date is actually datetime Z and not just date. My full and latest version of the function invocation is:
target:hsym `$("" sv ("./";getenv[`KDBHDB];"/volatilitysurface"));
rawdatadir:hsym `$getenv[`KDBRAWDATA];
.loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitioncol`dataprocessfunc!(`x`ccypair`atm_convention`premium_included`smile_type`vs_type`delta_ratio`delta_setting`wing_extrapolation`spread_type;"ZSSSSSFFFS";enlist ",";`volatilitysurface;target;`date;{[p;t] select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from t}); rawdatadir];
Upvotes: 3
Views: 1159
Reputation: 2991
I'm going to add a second answer here to try and tackle the question about using TorQ's data loader.
I'd like to clarify what output you are getting after running this function? There should be some logging messages output, can you post these? For example when I run the function:
jmcmurray@homer ~/deploy/TorQ (master) $ q torq.q -procname loader -proctype loader -debug
<torq startup messages removed>
q).loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitioncol`dataprocessfunc!(c;"TSSFJFFJJBS";enlist",";`quotes;`:testdb;`date;{[p;t] select date:.z.d,time:TIME,sym:INSTRUMENT,BID,ASK from t});`:csvtest]
2017.11.17D15:03:20.312336000|homer.aquaq.co.uk|loader|loader|INF|dataloader|**** LOADING :csvtest/tradesandquotes20140421.csv ****
2017.11.17D15:03:20.319110000|homer.aquaq.co.uk|loader|loader|INF|dataloader|reading in data chunk
2017.11.17D15:03:20.339414000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Read 11000 rows
2017.11.17D15:03:20.339463000|homer.aquaq.co.uk|loader|loader|INF|dataloader|processing data
2017.11.17D15:03:20.339519000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Enumerating
2017.11.17D15:03:20.340061000|homer.aquaq.co.uk|loader|loader|INF|dataloader|writing 11000 rows to :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.341669000|homer.aquaq.co.uk|loader|loader|INF|dataloader|**** LOADING :csvtest/tradesandquotes20140422.csv ****
2017.11.17D15:03:20.349606000|homer.aquaq.co.uk|loader|loader|INF|dataloader|reading in data chunk
2017.11.17D15:03:20.370793000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Read 11000 rows
2017.11.17D15:03:20.370858000|homer.aquaq.co.uk|loader|loader|INF|dataloader|processing data
2017.11.17D15:03:20.370911000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Enumerating
2017.11.17D15:03:20.371441000|homer.aquaq.co.uk|loader|loader|INF|dataloader|writing 11000 rows to :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.460118000|homer.aquaq.co.uk|loader|loader|INF|init|retrieving sort settings from :/home/jmcmurray/deploy/TorQ/config/sort.csv
2017.11.17D15:03:20.466690000|homer.aquaq.co.uk|loader|loader|INF|sort|sorting the quotes table
2017.11.17D15:03:20.466763000|homer.aquaq.co.uk|loader|loader|INF|sorttab|No sort parameters have been specified for : quotes. Using default parameters
2017.11.17D15:03:20.466820000|homer.aquaq.co.uk|loader|loader|INF|sortfunction|sorting :testdb/2017.11.17/quotes/ by these columns : sym, time
2017.11.17D15:03:20.527216000|homer.aquaq.co.uk|loader|loader|INF|applyattr|applying p attr to the sym column in :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.535095000|homer.aquaq.co.uk|loader|loader|INF|sort|finished sorting the quotes table
After all this, I can run \l testdb
and there is a table called "quotes" containing my loaded data
If you can post logging messages like these, it could be helpful to see what's going on.
UPDATE
"But now I see that if I call the function multiple times on the same files, it simply appends duplicated data instead of upserting it."
If I'm understanding the problem correctly, it sounds like you likely shouldn't call the function multiple times on the same files. Another process within TorQ could be useful here, the "file alerter". This process will monitor a directory for new & updated files, and can call a function on any that appear (so you can have it call the loader function with every new file automatically). It has a number of options such as moving files after processing (so you can "archive" loaded CSVs)
Note that the file alerter requires that a function take exactly two parameters - the directory & the file name. This effectively means you will need a "wrapper" function around the loader function, which takes a dictionary & a directory. I don't think TorQ includes a function similar to .loader.loadallfiles for a single file, so it might be necessary to copy the target file to a temporary directory, run loadallfiles on that directory and then delete the file from there before loading the next.
Upvotes: 1
Reputation: 2991
`cast error refers to a value not being enumerated
I can't see any enumeration going on here, splayed tables on disk need to have symbol columns enumerated. For example, this can be done with the following line, before calling .Q.dpft
volatilitysurface:.Q.en[afolder;volatilitysurface];
You may like to consider using an example CSV loader for loading your data. One such example is included in TorQ, the KDB framework developed by AquaQ Analytics (as a disclaimer, I work for AquaQ)
The framework is available (free of charge) here: https://github.com/AquaQAnalytics/TorQ
The specific component you will likely be interested in is dataloader.q and is documented here: http://aquaqanalytics.github.io/TorQ/utilities/#dataloaderq
This script will handle everything necessary, loading all files, enumerating, sorting on disk, applying attributes etc. as well as using .Q.fsn to prevent running out of memory
Upvotes: 1