Reputation: 33
I am develop a stock trading system using akka. I offer orderbooks into TradeQueue like below:
val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
println("TradeTask Start:"+task)
task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
log.info("TradeTask finish:"+task)
}))(Keep.left).run()
for (item <- 1 to 100) {
val task = TradeTask(item)
tradeQueue.offer(task)
}
But the sequence is disordered.
like this:
TradeTask Start:TradeTask(1)
TradeTask Start:TradeTask(2)
TradeTask finish:TradeTask(1)
TradeTask finish:TradeTask(2)
But I want FIFO and element enqueue before previous finish,like this
TradeTask Start:TradeTask(1)
TradeTask finish:TradeTask(1)
TradeTask Start:TradeTask(2)
TradeTask finish:TradeTask(2)
How to do that? Thanks
Upvotes: 0
Views: 686
Reputation: 17963
Already a FIFO
Your question already proves that the queue is "(F)irst (I)n (F)irst (O)ut". As shown in the output the first element to enter the stream, TradeTask(1)
, was the first element to be processed by the Sink
:
TradeTask Start:TradeTask(1) // <-- FIRST IN
TradeTask Start:TradeTask(2)
TradeTask finish:TradeTask(1) // <-- FIRST OUT
TradeTask finish:TradeTask(2)
Indirect Answer
The question you are asking is completely opposite to the purpose/usage of akka-stream. You are asking how to create an akka stream that does all of the processing serially, as opposed to asynchronously.
The whole point of akka is asynchronous processing & communication:
Welcome to Akka, a set of open-source libraries for designing scalable, resilient systems that span processor cores and networks.
If you want the processing done one-at-a-time then why go with akka in the first place? Synchronous processing of Iterable
elements is easily accomplished, without akka, using scala collections and for-comprehensions:
val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???
val logTask : Task => Unit =
(task) => log.info("TradeTask finish:" + task)
for {
item <- 1 to 100
task <- TradeTask(item)
aResult <- functionA(task)
bResult <- functionB(aResult)
cResult <- functionC(bResult)
} {
logTask(cResult)
}
Similarly you can use function composition and a simplified iteration:
val compositeFunction : Int => Unit =
TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask
(1 to 100) foreach compositeFunction
Upvotes: 1