artyomboyko
artyomboyko

Reputation: 2871

Sending actorRefWithAck inside stream

I'm using answer from this thread because I need to treat first element especially. The problem is, I need to send this data to another Actor or persist locally (which is not possibl).

So, my stream looks like this:

val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message]])] = Flow.fromSinkAndSourceMat(
  Flow[Message].mapAsync[Trade](1) {
    case TextMessage.Strict(text) =>
      Unmarshal(text).to[Trade]
    case streamed: TextMessage.Streamed =>
      streamed.textStream.runFold("")(_ ++ _).flatMap(Unmarshal(_).to[Trade])
  }.groupBy(pairs.size, _.s).prefixAndTail(1).flatMapConcat {
    case (head, tail) =>
      // sending first element here
      val result = Source(head).to(Sink.actorRefWithAck(
        ref = actor,
        onInitMessage = Init,
        ackMessage = Ack,
        onCompleteMessage = "done"
      )).run()
      // some kind of operation on the result
    Source(head).concat(tail)
  }.mergeSubstreams.toMat(sink)(Keep.right),
  Source.maybe[Message])(Keep.both)

Is this a good practice? Will it have unintended consequences? Unfortunately, I cannot call persist inside stream, so I want to send this data to the external system.

Upvotes: 0

Views: 82

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

Your current approach doesn't use result in any way, so a simpler alternative would be to fire and forget the first Message to the actor:

groupBy(pairs.size, _.s).prefixAndTail(1).flatMapConcat {
  case (head, tail) =>
    // sending first element here
    actor ! head.head

    Source(head).concat(tail)
}

The actor would then not have to worry about handling Init and sending Ack messages and could be solely concerned with persisting Message instances.

Upvotes: 1

Related Questions