Reputation: 79
Problem statement: Trying to evaluate Apache Flink for modelling advanced real time low latency distributed analytics
Use case abstract: Provide complex analytics for instruments I1, I2, I3... etc each having product definition P1, P2, P3; configured with user parameters (Dynamic) U1, U2,U3 & requiring streaming Market Data M1, M2, M3... Instrument Analytics function (A1,A2) are complex in terms of computation complexity, some of them could take 300-400ms but can be computed in parallel. From above clearly Market data stream would be much faster (<1ms) than analytics function & need to consume latest consistent market data for calculations. Next challenge is multiple Dependendant Enrichment functions E1,E2,E3 (e.g. Risk/PnL) which combine streaming Market data with instrument analytics result (E.g. Price or Yield) Last challenge is consistency for calculations - as function A1 could be faster than A2 and need a consistent all instrument result from given market input.
Calculation Graph dependency examples (scale it to hundreds of instruments & 10-15 market data sources):
In case above image is not visible, graph dependency flow is like:
- M1 + M2 + P1 => A2
- M1 + P1 => A1
- A1 + A2 => E2
- A1 => E1
- E1 + E2 => Result numbers
Questions:
Correct design/model for these calculation data streams, currently I use ConnectedStreams for (P1 + M1), Another approach could be to use Iterative model feeding same instruments static data to itself again?
Facing issue to use just latest market data events in calculations as analytics function (A1) is lot slower than Market data (M1) streaming. Hence need stale market data eviction for next iteration retaining those where no value is not available (LRU cache like)
Need to synchronize / correlate function execution of different time complexity so that iteration 2 starts only when everything in iteration 1 finished
Upvotes: 0
Views: 369
Reputation: 2371
This is quite a broad question and to answer it more precisely, one would need a few more details.
Below are a few thoughts that I hope will point you in a good direction and help you to approach your use case:
Connected streams by key (a.keyBy(...).connect(b.keyBy(...))
are the most powerful join- or union-like primitive. Using CoProcessFunction
on a connected stream should give you the flexibility to correlate or join values as needed. You can for example store the events from one stream in the state while waiting for a matching event to arrive from the other stream.
Holding always the latest data of one input is easily doable by just putting that value into the state of a CoFlatMapFunction
or a CoProcessFunction
. For each event from input 1, you store the event in the state. Each event from stream 2, you look into the state to find the latest event from stream 1.
To synchronize on time, you could actually look into using event time. Event time can also be "logical time", meaning just a version number, iteration number, or anything. You only need to make sure that the timestamp you assign and the watermarks you generate reflect that consistently.
If you window by event time then, you will get all data of that version together, regardless of whether one operator is faster than others, or the events arrive via paths with different latency. That is the beauty of real event time processing :-)
Upvotes: 4