roni762583
roni762583

Reputation: 75

Parsing DateTime String from CSV File As Event Timestamp

Using WSO2 SP, my app reads lines from the following CSV file:

20170801 000001237,1.321420,1.321510,0
20170801 000001487,1.321440,1.321530,0
20170801 000001737,1.321450,1.321530,0
20170801 000001987,1.321440,1.321530,0

The first column is a timestamp string that needs to be parsed to timestamp the event:

yyyyMMdd hhmmssfff

, where fff are milliseconds

my current app looks like this:

@App:name('ReceiveAndCount')
@App:description('count events in csv file')
@source(type = 'file', 
    mode='line',
    tailing='false',
    file.uri = "file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", 
    action.after.process='NONE',
        @map(type = 'csv', header='false', delimiter = ",",
        @attributes(
        dateTime = '0',
        bid='1',
        ask='2',
        ignore='3'  ) ))

define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);

@sink(type = 'log', priority='info')
define stream TotalCountStream (totalCount long);

-- Count the incoming events
@info(name = 'query1')
from csvGBPUSDstream 
select count() as totalCount
insert into TotalCountStream;

Any guidance on parsing the date string in the first column as event timestamp is greatly appreciated

Upvotes: 0

Views: 357

Answers (2)

roni762583
roni762583

Reputation: 75

Running wso2sp-editor in docker via the following command, with volume switch to allow reading local data file:

docker run -it -p 9390:9390 -v /Users/A/Desktop/wso2sp-editor-docker-data:/home/wso2carbon/wso2sp-4.4.0/data/ --name editor wso2/wso2sp-editor:4.4.0

Finally the app code:

 @App:name('CsvParseDateTime')

@App:description('parse timestamp string from csv file') --abrakadabra @source(type = 'file', mode = "line", tailing = "false", file.uri = "file://home/wso2carbon/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", --"file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", action.after.process = "NONE", @map(type = 'csv', header = "false", delimiter = ",", @attributes(dateTime = "0", ask = "2", ignore = "3", bid = "1"))) define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);

@sink(type = 'log', priority = "info") define stream fiveSecRangeStream (timestamp long, fiveSecRange double);

-- streat to parse dateTime string to millisecond time that can be used as index for time windows --time:timestampInMilliseconds(date.value,date.format) yyyy-MM-dd HH:mm:ss.SSS @info(name = 'Parse Timestamp') from csvGBPUSDstream select time:timestampInMilliseconds(dateTime, 'yyyyMMdd HHmmssSSS') as timestamp, bid, ask insert into indexedGBPUSDstream;

-- external window time (timestamp in event stream) from indexedGBPUSDstream#window.externalTime(timestamp,5 Sec) select timestamp, max(bid)-min(bid) as fiveSecRange insert into fiveSecRangeStream; -- for all-events ;

Upvotes: 0

roni762583
roni762583

Reputation: 75

The answer is in using the #window.externalTime() function

@App:name('CsvParseDateTime')

@App:description('parse timestamp string from csv file')

@source(type = 'file', mode = "line", tailing = "false", file.uri = "file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", action.after.process = "NONE", @map(type = 'csv', header = "false", delimiter = ",", @attributes(dateTime = "0", ask = "2", ignore = "3", bid = "1"))) define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);

@sink(type = 'log', priority = "info") define stream fiveSecRangeStream (timestamp long, fiveSecRange double);

-- streat to parse dateTime string to millisecond time that can be used as index for time windows --time:timestampInMilliseconds(date.value,date.format) yyyy-MM-dd HH:mm:ss.SSS @info(name = 'Parse Timestamp') from csvGBPUSDstream select time:timestampInMilliseconds(dateTime, 'yyyyMMdd HHmmssSSS') as timestamp, bid, ask insert into indexedGBPUSDstream;

-- external window time (timestamp in event stream) from indexedGBPUSDstream#window.externalTime(timestamp,5 Sec) select timestamp, max(bid)-min(bid) as fiveSecRange insert into fiveSecRangeStream; -- for all-events ;

Upvotes: 0

Related Questions