Reputation: 1313
I am trying to write a piece of code that would consume a stream of tickers (stock exchange symbol of a company) and fetch company information from a REST API for each ticker.
I want to fetch information for multiple companies asynchronously.
I would like to save the results to a file in a continuous manner as the entire data set might not fit into memory.
Following the documentation of akka streams and resources that I was able to google on this subject I have come up with the following piece of code (some parts are omitted for brevity):
implicit val actorSystem: ActorSystem = ActorSystem("stock-fetcher-system")
implicit val materializer: ActorMaterializer = ActorMaterializer(None, Some("StockFetcher"))(actorSystem)
implicit val context = system.dispatcher
import CompanyJsonMarshaller._
val parallelism = 10
val connectionPool = Http().cachedHostConnectionPoolHttps[String](s"api.iextrading.com")
val listOfSymbols = symbols.toList
val outputPath = "out.txt"
Source(listOfSymbols)
.mapAsync(parallelism) {
stockSymbol => Future(HttpRequest(uri = s"https://api.iextrading.com/1.0/stock/${stockSymbol.symbol}/company"), stockSymbol.symbol)
}
.via(connectionPool)
.map {
case (Success(response), _) => Unmarshal(response.entity).to[Company]
case (Failure(ex), symbol) => println(s"Unable to fetch char data for $symbol") "x"
}
.runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
.onComplete { _ =>
bufferedSource.close
actorSystem.terminate()
}
This is the problematic line:
runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
which doesn't compile and the compiler gives me this mysteriously looking error:
Type mismatch, expected Graph[SinkShape[Any, NotInferedMat2], actual Sink[ByeString, Future[IOResult]]
If I change the sink to Sink.ignore or println(_) it works.
I'd appreciate some more detailed explanation.
Upvotes: 1
Views: 824
Reputation: 19527
As the compiler is indicating, the types don't match. In the call to .map
...
.map {
case (Success(response), _) =>
Unmarshal(response.entity).to[Company]
case (Failure(ex), symbol) =>
println(s"Unable to fetch char data for $symbol")
"x"
}
...you're returning either a Company
instance or a String
, so the compiler infers the closest supertype (or "least upper bounds") to be Any
. The Sink
expects input elements of type ByteString
, not Any
.
One approach is to send the response to the file sink without unmarshalling the response:
Source(listOfSymbols)
.mapAsync(parallelism) {
...
}
.via(connectionPool)
.map(_.entity.dataBytes) // entity.dataBytes is a Source[ByteString, _]
.flatMapConcat(identity)
.runWith(FileIO.toPath(...))
Upvotes: 0