koleS
koleS

Reputation: 1313

Processing an akka stream asynchronously and writing to a file sink

I am trying to write a piece of code that would consume a stream of tickers (stock exchange symbol of a company) and fetch company information from a REST API for each ticker.

I want to fetch information for multiple companies asynchronously.

I would like to save the results to a file in a continuous manner as the entire data set might not fit into memory.

Following the documentation of akka streams and resources that I was able to google on this subject I have come up with the following piece of code (some parts are omitted for brevity):

  implicit val actorSystem: ActorSystem = ActorSystem("stock-fetcher-system")
  implicit val materializer: ActorMaterializer = ActorMaterializer(None, Some("StockFetcher"))(actorSystem)
  implicit val context = system.dispatcher

  import CompanyJsonMarshaller._
  val parallelism = 10
  val connectionPool = Http().cachedHostConnectionPoolHttps[String](s"api.iextrading.com")
  val listOfSymbols = symbols.toList

  val outputPath = "out.txt"  


  Source(listOfSymbols)
    .mapAsync(parallelism) {
      stockSymbol => Future(HttpRequest(uri = s"https://api.iextrading.com/1.0/stock/${stockSymbol.symbol}/company"), stockSymbol.symbol)
    }
    .via(connectionPool)
    .map {
      case (Success(response), _) => Unmarshal(response.entity).to[Company]
      case (Failure(ex), symbol)       => println(s"Unable to fetch char data for $symbol") "x"
    }
    .runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
    .onComplete { _ =>
      bufferedSource.close
      actorSystem.terminate()
    }

This is the problematic line:

runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))

which doesn't compile and the compiler gives me this mysteriously looking error:

Type mismatch, expected Graph[SinkShape[Any, NotInferedMat2], actual Sink[ByeString, Future[IOResult]]

If I change the sink to Sink.ignore or println(_) it works.

I'd appreciate some more detailed explanation.

Upvotes: 1

Views: 824

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

As the compiler is indicating, the types don't match. In the call to .map...

.map {
  case (Success(response), _) =>
    Unmarshal(response.entity).to[Company]
  case (Failure(ex), symbol)  =>
    println(s"Unable to fetch char data for $symbol")
    "x"
}

...you're returning either a Company instance or a String, so the compiler infers the closest supertype (or "least upper bounds") to be Any. The Sink expects input elements of type ByteString, not Any.

One approach is to send the response to the file sink without unmarshalling the response:

Source(listOfSymbols)
  .mapAsync(parallelism) {
    ...
  }
  .via(connectionPool)
  .map(_.entity.dataBytes) // entity.dataBytes is a Source[ByteString, _]
  .flatMapConcat(identity)
  .runWith(FileIO.toPath(...))

Upvotes: 0

Related Questions