synapse
synapse

Reputation: 5728

How to react to upstream completion in my flow?

Suppose there's an Akka stream flow defined like this:

def tee = {
  var writer: Writer = ???

  Flow.fromFunction[String, String] { msg =>
    writer.write(msg)
    msg
  }
}

When the upstream is finished it needs to flush an close the writer. Is there a way to do it without resorting to GraphStageLogic etc as described here https://doc.akka.io/docs/akka/current/stream/stream-customize.html?

Upvotes: 0

Views: 123

Answers (1)

Ivan Stanislavciuc
Ivan Stanislavciuc

Reputation: 7275

It's not possible doing this with Flow without converting it to a Sink.

If sink is an option, do following

  def tee = {
    val writer: Writer = new StringWriter()

    Sink
      .foreach[String] { msg =>
        writer.write(msg)
      }
      .mapMaterializedValue(_.map { done =>
        writer.close()
        done
      })
  }

The similar thing can be done with the help of akka.stream.scaladsl.StreamConverters as following

    val sink: Sink[String, Future[IOResult]] = {
      StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
    }

Upvotes: 1

Related Questions