Reputation: 75
Using WSO2 SP, my app reads lines from the following CSV file:
20170801 000001237,1.321420,1.321510,0
20170801 000001487,1.321440,1.321530,0
20170801 000001737,1.321450,1.321530,0
20170801 000001987,1.321440,1.321530,0
The first column is a timestamp string that needs to be parsed to timestamp the event:
yyyyMMdd hhmmssfff
, where fff are milliseconds
my current app looks like this:
@App:name('ReceiveAndCount')
@App:description('count events in csv file')
@source(type = 'file',
mode='line',
tailing='false',
file.uri = "file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv",
action.after.process='NONE',
@map(type = 'csv', header='false', delimiter = ",",
@attributes(
dateTime = '0',
bid='1',
ask='2',
ignore='3' ) ))
define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);
@sink(type = 'log', priority='info')
define stream TotalCountStream (totalCount long);
-- Count the incoming events
@info(name = 'query1')
from csvGBPUSDstream
select count() as totalCount
insert into TotalCountStream;
Any guidance on parsing the date string in the first column as event timestamp is greatly appreciated
Upvotes: 0
Views: 357
Reputation: 75
Running wso2sp-editor in docker via the following command, with volume switch to allow reading local data file:
docker run -it -p 9390:9390 -v /Users/A/Desktop/wso2sp-editor-docker-data:/home/wso2carbon/wso2sp-4.4.0/data/ --name editor wso2/wso2sp-editor:4.4.0
Finally the app code:
@App:name('CsvParseDateTime')
@App:description('parse timestamp string from csv file') --abrakadabra @source(type = 'file', mode = "line", tailing = "false", file.uri = "file://home/wso2carbon/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", --"file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", action.after.process = "NONE", @map(type = 'csv', header = "false", delimiter = ",", @attributes(dateTime = "0", ask = "2", ignore = "3", bid = "1"))) define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);
@sink(type = 'log', priority = "info") define stream fiveSecRangeStream (timestamp long, fiveSecRange double);
-- streat to parse dateTime string to millisecond time that can be used as index for time windows --time:timestampInMilliseconds(date.value,date.format) yyyy-MM-dd HH:mm:ss.SSS @info(name = 'Parse Timestamp') from csvGBPUSDstream select time:timestampInMilliseconds(dateTime, 'yyyyMMdd HHmmssSSS') as timestamp, bid, ask insert into indexedGBPUSDstream;
-- external window time (timestamp in event stream) from indexedGBPUSDstream#window.externalTime(timestamp,5 Sec) select timestamp, max(bid)-min(bid) as fiveSecRange insert into fiveSecRangeStream; -- for all-events ;
Upvotes: 0
Reputation: 75
The answer is in using the #window.externalTime() function
@App:name('CsvParseDateTime')
@App:description('parse timestamp string from csv file')
@source(type = 'file', mode = "line", tailing = "false", file.uri = "file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", action.after.process = "NONE", @map(type = 'csv', header = "false", delimiter = ",", @attributes(dateTime = "0", ask = "2", ignore = "3", bid = "1"))) define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);
@sink(type = 'log', priority = "info") define stream fiveSecRangeStream (timestamp long, fiveSecRange double);
-- streat to parse dateTime string to millisecond time that can be used as index for time windows --time:timestampInMilliseconds(date.value,date.format) yyyy-MM-dd HH:mm:ss.SSS @info(name = 'Parse Timestamp') from csvGBPUSDstream select time:timestampInMilliseconds(dateTime, 'yyyyMMdd HHmmssSSS') as timestamp, bid, ask insert into indexedGBPUSDstream;
-- external window time (timestamp in event stream) from indexedGBPUSDstream#window.externalTime(timestamp,5 Sec) select timestamp, max(bid)-min(bid) as fiveSecRange insert into fiveSecRangeStream; -- for all-events ;
Upvotes: 0