Reputation: 53826
I'm accessing data from a REST endpoint :
"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"
To access the data once per second I use an infinite loop while(true) {
to invoke a message send to the actor once per second which begins the process of invoking the REST request:
The actor
to access the data is:
object ProductTickerRestActor {
case class StringData(data: String)
}
class ProductTickerRestActor extends Actor {
override def receive: PartialFunction[Any, Unit] = {
case ProductTickerRestActor.StringData(data) =>
try {
println("in ProductTickerRestActor")
val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
println("rData : "+rData)
}
catch {
case e: Exception =>
println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
}
case msg => println(s"I cannot understand ${msg.toString}")
}
}
I start the application using:
object ExchangeModelDataApplication {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystemConfig.getActorSystem
val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
val throttler = Throttlers.getThrottler(priceDataActor)
while(true) {
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
}
}
Throttler:
object Throttlers {
implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)
def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 1.second)
.to(Sink.actorRef(priceDataActor, NotUsed))
.run()
}
How to run the following code asynchronously instead of blocking using an infinite loop? :
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
Also, the throttler, in this case, maybe redundant as I'm throttling requests within the loop regardless.
Upvotes: 0
Views: 628
Reputation: 27356
Sending a request every second is not a good idea. If, for some reason, the request is delayed, you are going to get a lot of requests piled up. Instead, send the next request one second after the previous requests completes.
Because this code uses a synchronous GET
request, you can just send the next request one second after mkString
returns.
But using synchronous requests is not a good way to consume a RESTful API in Akka. It blocks the actor receive
method until the request is complete, which can eventually block the whole ActorSystem
.
Instead, use Akka Http and singleRequest
to perform an asynchronous request.
Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))
This returns a Future
. Make the new request one second after the request completes (e.g. using onComplete
on the Future
).
Not only is this safer and more asynchronous, it also provides much more control over the REST API call than fromUrl
Upvotes: 1
Reputation: 20551
I would just use Akka Streams for this along with Akka HTTP. Using Akka 2.6.x, something along these lines would be sufficient for 1 request/second
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl._
import scala.concurrent.duration._
object HTTPRepeatedly {
implicit val system = ActorSystem()
import system.dispatcher
val sourceFromHttp: Source[String, NotUsed] =
Source.repeated("test") // Not sure what "test" is actually used for here...
.throttle(1, 1.second)
.map { str =>
HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
}.mapAsync(1) { req =>
Http().singleRequest(req)
}.mapAsync(1)(_.entity.toStrict(1.minute))
.map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
}
Then you could, for instance (for simplicity, put this in a main
within HTTPRepeatedly
so the implicits are in scope etc.)
val done: Future[Done] =
sourceFromHttp
.take(10) // stop after 10 requests
.runWith(Sink.foreach { rData => println(s"rData: $rData") })
scala.concurrent.Await.result(done, 11.minute)
system.terminate()
Upvotes: 1