liaolunhui
liaolunhui

Reputation: 33

how to make akka stream source queue FIFO?

I am develop a stock trading system using akka. I offer orderbooks into TradeQueue like below:

val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
  println("TradeTask Start:"+task)
  task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
     log.info("TradeTask finish:"+task)
 }))(Keep.left).run()


 for (item <- 1 to 100) {
    val task = TradeTask(item)
    tradeQueue.offer(task)
 }   

But the sequence is disordered.

like this:

TradeTask Start:TradeTask(1)

TradeTask Start:TradeTask(2)

TradeTask finish:TradeTask(1)

TradeTask finish:TradeTask(2)

But I want FIFO and element enqueue before previous finish,like this

TradeTask Start:TradeTask(1)

TradeTask finish:TradeTask(1)

TradeTask Start:TradeTask(2)

TradeTask finish:TradeTask(2)

How to do that? Thanks

Upvotes: 0

Views: 686

Answers (1)

Already a FIFO

Your question already proves that the queue is "(F)irst (I)n (F)irst (O)ut". As shown in the output the first element to enter the stream, TradeTask(1), was the first element to be processed by the Sink:

TradeTask Start:TradeTask(1)    // <-- FIRST IN

TradeTask Start:TradeTask(2)

TradeTask finish:TradeTask(1)   // <-- FIRST OUT

TradeTask finish:TradeTask(2)

Indirect Answer

The question you are asking is completely opposite to the purpose/usage of akka-stream. You are asking how to create an akka stream that does all of the processing serially, as opposed to asynchronously.

The whole point of akka is asynchronous processing & communication:

Welcome to Akka, a set of open-source libraries for designing scalable, resilient systems that span processor cores and networks.

If you want the processing done one-at-a-time then why go with akka in the first place? Synchronous processing of Iterable elements is easily accomplished, without akka, using scala collections and for-comprehensions:

val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???

val logTask : Task => Unit = 
  (task) => log.info("TradeTask finish:" + task)

for {
  item    <- 1 to 100
  task    <- TradeTask(item)
  aResult <- functionA(task)
  bResult <- functionB(aResult)
  cResult <- functionC(bResult)
} { 
  logTask(cResult)
}

Similarly you can use function composition and a simplified iteration:

val compositeFunction : Int => Unit = 
  TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask

(1 to 100) foreach compositeFunction

Upvotes: 1

Related Questions