Alex Gilleran
Alex Gilleran

Reputation: 598

Create backpressure from a Future inside an Akka stream

I'm new to Akka streams and streams in general so I might have completely misunderstood something at a conceptual level, but is there any way I can create backpressure until a future resolves? Essentially what I want to do is like this:

object Parser {
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .map(values => doAsyncOp(values))
  .runWith(Sink.seq)

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???

Bytes are read from a file and streamed to the parser, which emits Seqs of ExampleObjects, and those are streamed to an async operation that returns a Future. I want to make it so that until the Future resolves, the rest of the stream gets backpressured, then resumes once the Future is resolved, passing another Seq[ExampleObject] to doAsyncOp, which resumes the backpressure and so on.

Right now I've got this working with:

Await.result(doAsyncOp(values), 10 seconds)

But my understanding is that this locks up the entire thread and is bad. Is there any better way around it?

If it helps, the big picture is that I'm trying to parse an extremely large JSON file (too big to fit in memory) chunk-by-chunk with Jawn, then pass objects to ElasticSearch to be indexed as they're parsed - ElasticSearch has a queue of 50 pending operations, if that overflows it starts rejecting new objects.

Upvotes: 7

Views: 870

Answers (1)

expert
expert

Reputation: 30125

It's quite easy. You need to use mapAync :)

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .mapAsync(4)(values => doAsyncOp(values))
  .runWith(Sink.seq)

where 4 is level of parallelism.

Upvotes: 10

Related Questions